Repository: samza Updated Branches: refs/heads/master 52d8ddd6e -> 05113c339
SAMZA-1437; Added tests, make eventData creation extensible vjagadish1989 Required for internal custom eventData creation Author: Daniel Chen <[email protected]> Reviewers: Jagadish<[email protected]> Closes #352 from dxichen/create-event-data-extensible Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/05113c33 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/05113c33 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/05113c33 Branch: refs/heads/master Commit: 05113c3395f92b74e41c75b0ca28153fb3dbf915 Parents: 52d8ddd Author: Daniel Chen <[email protected]> Authored: Tue Nov 7 17:15:28 2017 -0800 Committer: Jagadish <[email protected]> Committed: Tue Nov 7 17:15:28 2017 -0800 ---------------------------------------------------------------------- .../producer/EventHubSystemProducer.java | 2 +- .../consumer/TestEventHubSystemConsumer.java | 56 ++++++++++++++++++- .../producer/SwapFirstLastByteInterceptor.java | 36 +++++++++++++ .../producer/TestEventHubSystemProducer.java | 57 ++++++++++++++++++++ 4 files changed, 149 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index c8c5538..505421c 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -271,7 +271,7 @@ public class EventHubSystemProducer implements SystemProducer { } } - private EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) { + protected EventData createEventData(String streamName, OutgoingMessageEnvelope envelope) { Optional<Interceptor> interceptor = Optional.ofNullable(interceptors.getOrDefault(streamName, null)); byte[] eventValue = (byte[]) envelope.getMessage(); if (interceptor.isPresent()) { http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java index b89c805..a25a3b6 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java @@ -27,6 +27,7 @@ import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.eventhub.*; import org.apache.samza.system.eventhub.admin.PassThroughInterceptor; +import org.apache.samza.system.eventhub.producer.SwapFirstLastByteInterceptor; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,12 +47,16 @@ public class TestEventHubSystemConsumer { private static final String MOCK_ENTITY_2 = "mocktopic2"; private void verifyEvents(List<IncomingMessageEnvelope> messages, List<EventData> eventDataList) { + verifyEvents(messages, eventDataList, new PassThroughInterceptor()); + } + + private void verifyEvents(List<IncomingMessageEnvelope> messages, List<EventData> eventDataList, Interceptor interceptor) { Assert.assertEquals(messages.size(), eventDataList.size()); for (int i = 0; i < messages.size(); i++) { IncomingMessageEnvelope message = messages.get(i); EventData eventData = eventDataList.get(i); Assert.assertEquals(message.getKey(), eventData.getSystemProperties().getPartitionKey()); - Assert.assertEquals(message.getMessage(), eventData.getBytes()); + Assert.assertEquals(message.getMessage(), interceptor.intercept(eventData.getBytes())); Assert.assertEquals(message.getOffset(), eventData.getSystemProperties().getOffset()); } } @@ -144,6 +149,55 @@ public class TestEventHubSystemConsumer { } @Test + public void testSinglePartitionConsumptionInterceptor() throws Exception { + String systemName = "eventhubs"; + String streamName = "testStream"; + int numEvents = 10; // needs to be less than BLOCKING_QUEUE_SIZE + int partitionId = 0; + Interceptor interceptor = new SwapFirstLastByteInterceptor(); + + TestMetricsRegistry testMetrics = new TestMetricsRegistry(); + Map<SystemStreamPartition, List<EventData>> eventData = new HashMap<>(); + SystemStreamPartition ssp = new SystemStreamPartition(systemName, streamName, new Partition(partitionId)); + Map<String, Interceptor> interceptors = new HashMap<>(); + interceptors.put(streamName, interceptor); + + // create EventData + List<EventData> singlePartitionEventData = MockEventData.generateEventData(numEvents); + eventData.put(ssp, singlePartitionEventData); + + // Set configs + Map<String, String> configMap = new HashMap<>(); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1); + + MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData); + + EventHubSystemConsumer consumer = + new EventHubSystemConsumer(new EventHubConfig(configMap), systemName, eventHubClientWrapperFactory, interceptors, + testMetrics); + consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM); + consumer.start(); + + // Mock received data from EventHub + eventHubClientWrapperFactory.sendToHandlers(consumer.streamPartitionHandlers); + + List<IncomingMessageEnvelope> result = consumer.poll(Collections.singleton(ssp), 1000).get(ssp); + + verifyEvents(result, singlePartitionEventData, interceptor); + Assert.assertEquals(testMetrics.getCounters(streamName).size(), 3); + Assert.assertEquals(testMetrics.getGauges(streamName).size(), 2); + Map<String, Counter> counters = + testMetrics.getCounters(streamName).stream().collect(Collectors.toMap(Counter::getName, Function.identity())); + + Assert.assertEquals(counters.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(), numEvents); + Assert.assertEquals(counters.get(EventHubSystemConsumer.READ_ERRORS).getCount(), 0); + } + + @Test public void testMultiPartitionConsumptionHappyPath() throws Exception { String systemName = "eventhubs"; String streamName = "testStream"; http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java new file mode 100644 index 0000000..912e087 --- /dev/null +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/SwapFirstLastByteInterceptor.java @@ -0,0 +1,36 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.system.eventhub.producer; + +import org.apache.samza.system.eventhub.Interceptor; + +public class SwapFirstLastByteInterceptor implements Interceptor { + + @Override + public byte[] intercept(byte[] bytes) { + // Swap first and last bytes + if (bytes.length < 2) + return bytes; + byte tmp = bytes[bytes.length - 1]; + bytes[bytes.length - 1] = bytes[0]; + bytes[0] = tmp; + return bytes; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/05113c33/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java index bf62e92..10016ec 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java @@ -102,6 +102,63 @@ public class TestEventHubSystemProducer { } @Test + public void testSendingToSpecificPartitionsWithInterceptor() throws Exception { + String systemName = "eventhubs"; + String streamName = "testStream"; + int numEvents = 10; + int partitionId0 = 0; + int partitionId1 = 1; + Interceptor interceptor = new SwapFirstLastByteInterceptor(); + + TestMetricsRegistry testMetrics = new TestMetricsRegistry(); + Map<String, Interceptor> interceptors = new HashMap<>(); + interceptors.put(streamName, interceptor); + + List<String> outgoingMessagesP0 = generateMessages(numEvents); + List<String> outgoingMessagesP1 = generateMessages(numEvents); + + // Set configs + Map<String, String> configMap = new HashMap<>(); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY); + configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1); + configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), + PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString()); + + MockEventHubClientManagerFactory factory = new MockEventHubClientManagerFactory(); + + EventHubSystemProducer producer = + new EventHubSystemProducer(new EventHubConfig(configMap), systemName, factory, interceptors, testMetrics); + + SystemStream systemStream = new SystemStream(systemName, streamName); + producer.register(streamName); + producer.start(); + + outgoingMessagesP0.forEach(message -> + producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId0, null, message.getBytes()))); + outgoingMessagesP1.forEach(message -> + producer.send(streamName, new OutgoingMessageEnvelope(systemStream, partitionId1, null, message.getBytes()))); + + // Retrieve sent data + List<String> receivedData0 = factory.getSentData(systemName, streamName, partitionId0) + .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList()); + List<String> receivedData1 = factory.getSentData(systemName, streamName, partitionId1) + .stream().map(eventData -> new String(eventData.getBytes())).collect(Collectors.toList()); + + List<String> expectedP0 = outgoingMessagesP0.stream() + .map(message -> new String(interceptor.intercept(message.getBytes()))) + .collect(Collectors.toList()); + List<String> expectedP1 = outgoingMessagesP1.stream() + .map(message -> new String(interceptor.intercept(message.getBytes()))) + .collect(Collectors.toList()); + + Assert.assertTrue(expectedP0.equals(receivedData0)); + Assert.assertTrue(expectedP1.equals(receivedData1)); + } + + @Test public void testSendingToEventHubHashing() throws Exception { String systemName = "eventhubs"; String streamName = "testStream";
