Repository: samza Updated Branches: refs/heads/master cb92cf18b -> 7887b6d86
SAMZA-1502; Make AllSspToSingleTaskGrouper work with Yarn and ZK JobCoordinator Sending a fresh review as I lost the earlier diffs. This is the new approach that we discussed by adding the processor list in the config and passing it to grouper. Author: Aditya Toomula <[email protected]> Reviewers: Yi Pan <[email protected]>, Shanthoosh V <[email protected]> Closes #383 from atoomula/samza Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7887b6d8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7887b6d8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7887b6d8 Branch: refs/heads/master Commit: 7887b6d868ee048767c11c2db3a5d38093d9abf9 Parents: cb92cf1 Author: Aditya Toomula <[email protected]> Authored: Tue Dec 12 21:41:37 2017 -0800 Committer: Jagadish <[email protected]> Committed: Tue Dec 12 21:41:37 2017 -0800 ---------------------------------------------------------------------- .../AllSspToSingleTaskGrouperFactory.java | 50 ++++-- .../standalone/PassthroughJobCoordinator.java | 6 +- .../org/apache/samza/config/JobConfig.scala | 1 + .../samza/coordinator/JobModelManager.scala | 21 ++- .../stream/TestAllSspToSingleTaskGrouper.java | 125 +++++++++++++++ .../processor/TestZkLocalApplicationRunner.java | 152 ++++++++++++++++--- 6 files changed, 315 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java index 2d22977..d3c5080 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java @@ -19,35 +19,40 @@ package org.apache.samza.container.grouper.stream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.TaskConfigJava; import org.apache.samza.container.TaskName; import org.apache.samza.system.SystemStreamPartition; -import java.util.Collections; -import java.util.Map; -import java.util.Set; /** - * AllSspToSingleTaskGrouper, as the name suggests, assigns all partitions to be consumed by a single TaskInstance - * This is useful, in case of using load-balanced consumers like the new Kafka consumer, Samza doesn't control the - * partitions being consumed by a task. Hence, it is assumed that there is only 1 task that processes all messages, - * irrespective of which partition it belongs to. - * This also implies that container and tasks are synonymous when this grouper is used. Taskname(s) has to be globally - * unique within a given job. + * AllSspToSingleTaskGrouper creates TaskInstances equal to the number of containers and assigns all partitions to be + * consumed by each TaskInstance. This is useful, in case of using load-balanced consumers like the high-level Kafka + * consumer and Kinesis consumer, where Samza doesn't control the partitions being consumed by the task. * - * Note: This grouper does not take in broadcast streams yet. + * Note that this grouper does not take in broadcast streams yet. */ + class AllSspToSingleTaskGrouper implements SystemStreamPartitionGrouper { - private final int containerId; + private final List<String> processorList; - public AllSspToSingleTaskGrouper(int containerId) { - this.containerId = containerId; + public AllSspToSingleTaskGrouper(List<String> processorList) { + this.processorList = processorList; } @Override public Map<TaskName, Set<SystemStreamPartition>> group(final Set<SystemStreamPartition> ssps) { + Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<>(); + if (ssps == null) { throw new SamzaException("ssp set cannot be null!"); } @@ -55,15 +60,28 @@ class AllSspToSingleTaskGrouper implements SystemStreamPartitionGrouper { throw new SamzaException("Cannot process stream task with no input system stream partitions"); } - final TaskName taskName = new TaskName(String.format("Task-%s", String.valueOf(containerId))); + processorList.forEach(processor -> { + // Create a task name for each processor and assign all partitions to each task name. + final TaskName taskName = new TaskName(String.format("Task-%s", processor)); + groupedMap.put(taskName, ssps); + }); - return Collections.singletonMap(taskName, ssps); + return groupedMap; } } public class AllSspToSingleTaskGrouperFactory implements SystemStreamPartitionGrouperFactory { @Override public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) { - return new AllSspToSingleTaskGrouper(config.getInt(JobConfig.PROCESSOR_ID())); + if (!(new TaskConfigJava(config).getBroadcastSystemStreams().isEmpty())) { + throw new ConfigException("The job configured with AllSspToSingleTaskGrouper cannot have broadcast streams."); + } + + String processors = config.get(JobConfig.PROCESSOR_LIST()); + List<String> processorList = Arrays.asList(processors.split(",")); + if (processorList.isEmpty()) { + throw new SamzaException("processor list cannot be empty!"); + } + return new AllSspToSingleTaskGrouper(processorList); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 87a1cfa..5147169 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -23,6 +23,7 @@ import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.JobConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; @@ -125,13 +126,16 @@ public class PassthroughJobCoordinator implements JobCoordinator { StreamMetadataCache streamMetadataCache = new StreamMetadataCache( Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID())); + /** TODO: Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also, in SamzaContainer for writing locality info to the coordinator stream. This closely couples together TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) */ - return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null); + return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, + Collections.singletonList(containerId)); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 083dbaf..de83919 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -79,6 +79,7 @@ object JobConfig { // Processor Config Constants val PROCESSOR_ID = "processor.id" + val PROCESSOR_LIST = "processor.list" implicit def Config2Job(config: Config) = new JobConfig(config) http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index c2e0665..99b1abe 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -24,7 +24,9 @@ import java.util import java.util.concurrent.atomic.AtomicReference import org.apache.samza.config.ClusterManagerConfig +import org.apache.samza.config.JobConfig import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.config.MapConfig import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config.Config @@ -49,6 +51,7 @@ import org.apache.samza.util.Util import org.apache.samza.{Partition, PartitionChangeException, SamzaException} import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer /** * Helper companion object that is responsible for wiring up a JobModelManager @@ -104,7 +107,15 @@ object JobModelManager extends Logging { val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0) val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping() - val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, null) + + val processorList = new ListBuffer[String]() + val containerCount = new JobConfig(config).getContainerCount + for (i <- 0 until containerCount) { + processorList += i.toString + } + + val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, + streamMetadataCache, processorList.toList.asJava) val jobModel = jobModelManager.jobModel // Save the changelog mapping back to the ChangelogPartitionmanager // newChangelogPartitionMapping is the merging of all current task:changelog @@ -211,7 +222,13 @@ object JobModelManager extends Logging { containerIds: java.util.List[String]): JobModel = { // Do grouping to fetch TaskName to SSP mapping val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache) - val grouper = getSystemStreamPartitionGrouper(config) + + // processor list is required by some of the groupers. So, let's pass them as part of the config. + // Copy the config and add the processor list to the config copy. + val configMap = new util.HashMap[String, String](config) + configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", containerIds)) + val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap)) + val groups = grouper.group(allSystemStreamPartitions.asJava) info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet())) http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java new file mode 100644 index 0000000..fa3b33f --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestAllSspToSingleTaskGrouper { + private SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)); + private SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)); + private SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)); + private SystemStreamPartition ab0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)); + private AllSspToSingleTaskGrouperFactory grouperFactory = new AllSspToSingleTaskGrouperFactory(); + + @Test + public void testLocalStreamGroupedCorrectlyForYarn() { + HashSet<SystemStreamPartition> allSSPs = new HashSet<>(); + HashMap<String, String> configMap = new HashMap<>(); + + configMap.put("job.container.count", "2"); + configMap.put("processor.list", "0,1"); + + Config config = new MapConfig(configMap); + + SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); + + Collections.addAll(allSSPs, aa0, aa1, aa2, ab0); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>(); + + HashSet<SystemStreamPartition> partitions = new HashSet<>(); + partitions.add(aa0); + partitions.add(aa1); + partitions.add(aa2); + partitions.add(ab0); + expectedResult.put(new TaskName("Task-0"), partitions); + expectedResult.put(new TaskName("Task-1"), partitions); + + assertEquals(expectedResult, result); + } + + @Test + public void testLocalStreamGroupedCorrectlyForPassthru() { + HashSet<SystemStreamPartition> allSSPs = new HashSet<>(); + HashMap<String, String> configMap = new HashMap<>(); + + configMap.put("job.coordinator.factory", "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configMap.put("processor.id", "1"); + configMap.put("processor.list", configMap.get("processor.id")); + + Config config = new MapConfig(configMap); + + SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); + + Collections.addAll(allSSPs, aa0, aa1, aa2, ab0); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>(); + + HashSet<SystemStreamPartition> partitions = new HashSet<>(); + partitions.add(aa0); + partitions.add(aa1); + partitions.add(aa2); + partitions.add(ab0); + expectedResult.put(new TaskName("Task-1"), partitions); + + assertEquals(expectedResult, result); + } + + @Test(expected = SamzaException.class) + public void testLocalStreamWithEmptySsps() { + HashSet<SystemStreamPartition> allSSPs = new HashSet<>(); + HashMap<String, String> configMap = new HashMap<>(); + + configMap.put("job.coordinator.factory", "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configMap.put("processor.list", "1"); + Config config = new MapConfig(configMap); + + SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); + + grouper.group(allSSPs); + } + + @Test(expected = ConfigException.class) + public void testLocalStreamWithBroadcastStream() { + HashMap<String, String> configMap = new HashMap<>(); + + configMap.put("task.broadcast.inputs", "test.stream#0"); + configMap.put("processor.list", "1"); + Config config = new MapConfig(configMap); + + grouperFactory.getSystemStreamPartitionGrouper(config); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index eb087bb..9c5dad5 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -40,8 +40,10 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; +import org.apache.samza.container.TaskName; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; @@ -97,6 +99,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private String inputKafkaTopic; private String outputKafkaTopic; + private String inputSinglePartitionKafkaTopic; + private String outputSinglePartitionKafkaTopic; private ZkUtils zkUtils; private ApplicationConfig applicationConfig1; private ApplicationConfig applicationConfig2; @@ -113,7 +117,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne @Rule public final ExpectedException expectedException = ExpectedException.none(); -// @Override + // @Override public void setUp() { super.setUp(); String uniqueTestId = UUID.randomUUID().toString(); @@ -121,6 +125,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne testStreamAppId = String.format("test-app-id-%s", uniqueTestId); inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId); outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId); + inputSinglePartitionKafkaTopic = String.format("test-input-single-partition-topic-%s", uniqueTestId); + outputSinglePartitionKafkaTopic = String.format("test-output-single-partition-topic-%s", uniqueTestId); // Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system // TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+ @@ -147,15 +153,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LOGGER.info("Creating kafka topic: {}.", kafkaTopic); TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new Properties()); } + for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)) { + LOGGER.info("Creating kafka topic: {}.", kafkaTopic); + TestUtils.createTopic(zkUtils(), kafkaTopic, 1, 1, servers(), new Properties()); + } } -// @Override + // @Override public void tearDown() { if (zookeeper().zookeeper().isRunning()) { for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { LOGGER.info("Deleting kafka topic: {}.", kafkaTopic); AdminUtils.deleteTopic(zkUtils(), kafkaTopic); } + for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)) { + LOGGER.info("Deleting kafka topic: {}.", kafkaTopic); + AdminUtils.deleteTopic(zkUtils(), kafkaTopic); + } zkUtils.close(); super.tearDown(); } @@ -175,7 +189,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne } private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic, - String appName, String appId) { + String appName, String appId) { Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder() .put(TaskConfig.INPUT_STREAMS(), inputTopic) .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName) @@ -197,24 +211,25 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne return applicationConfig; } + /** + * sspGrouper is set to GroupBySystemStreamPartitionFactory. + * Run a stream application(streamApp1) consuming messages from input topic(effectively one container). + * + * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2). + * + * Assertions: + * A) JobModel generated before and after the addition of streamApp2 should be equal. + * B) Second stream application(streamApp2) should not join the group and process any message. + */ + //@Test public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException { - /** - * sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test case(All ssp's from input kafka topic are mapped to a single task). - * Run a stream application(streamApp1) consuming messages from input topic(effectively one container). - * - * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2). - * - * Assertions: - * A) JobModel generated before and after the addition of streamApp2 should be equal. - * B) Second stream application(streamApp2) should not join the group and process any message. - */ - // Set up kafka topics. - publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]); + publishKafkaEvents(inputSinglePartitionKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]); // Configuration, verification variables - MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); + MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), + "org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block). final JobModel[] previousJobModel = new JobModel[1]; final String[] previousJobModelVersion = new String[1]; @@ -231,7 +246,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up stream app 2. CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS); LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig)); - StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null); + StreamApplication streamApp2 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, + processedMessagesLatch, null, null); // Callback handler for streamApp1. StreamApplicationCallback streamApplicationCallback = message -> { @@ -251,7 +267,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up stream app 1. LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig)); - StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch); + StreamApplication streamApp1 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, + null, streamApplicationCallback, kafkaEventsConsumedLatch); localApplicationRunner1.run(streamApp1); kafkaEventsConsumedLatch.await(); @@ -268,6 +285,99 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); } + /** + * sspGrouper is set to AllSspToSingleTaskGrouperFactory (All ssps from input kafka topic are mapped to a single task per container). + * AllSspToSingleTaskGrouperFactory should be used only with high-level consumers which do the partition management + * by themselves. Using the factory with the consumers that do not do the partition management will result in + * each processor/task consuming all the messages from all the partitions. + * Run a stream application(streamApp1) consuming messages from input topic(effectively one container). + * + * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2). + * + * Assertions: + * A) JobModel generated before and after the addition of streamApp2 should not be equal. + * B) Second stream application(streamApp2) should join the group and process all the messages. + */ + + //@Test + public void shouldUpdateJobModelWhenNewProcessorJoiningGroupUsingAllSspToSingleTaskGrouperFactory() throws InterruptedException { + // Set up kafka topics. + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]); + + // Configuration, verification variables + MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); + // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block). + final JobModel[] previousJobModel = new JobModel[1]; + final String[] previousJobModelVersion = new String[1]; + AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false); + final CountDownLatch secondProcessorRegistered = new CountDownLatch(1); + + zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> { + // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1. + if (currentChilds.contains(PROCESSOR_IDS[1])) { + secondProcessorRegistered.countDown(); + } + }); + + // Set up streamApp2. + CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); + LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig)); + StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null); + + // Callback handler for streamApp1. + StreamApplicationCallback streamApplicationCallback = message -> { + if (hasSecondProcessorJoined.compareAndSet(false, true)) { + previousJobModelVersion[0] = zkUtils.getJobModelVersion(); + previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]); + localApplicationRunner2.run(streamApp2); + try { + // Wait for streamApp2 to register with zookeeper. + secondProcessorRegistered.await(); + } catch (InterruptedException e) { + } + } + }; + + // This is the latch for the messages received by streamApp1. Since streamApp1 is run first, it gets one event + // redelivered due to re-balancing done by Zk after the streamApp2 joins (See the callback above). + CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1); + + // Set up stream app 1. + LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig)); + StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, + streamApplicationCallback, kafkaEventsConsumedLatch); + localApplicationRunner1.run(streamApp1); + + kafkaEventsConsumedLatch.await(); + + String currentJobModelVersion = zkUtils.getJobModelVersion(); + JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion); + + // JobModelVersion check to verify that leader publishes new jobModel. + assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion)); + + // Job model before and after the addition of second stream processor should not be the same. + assertTrue(!previousJobModel[0].equals(updatedJobModel)); + + // Task names in the job model should be different but the set of partitions should be the same and each task name + // should be assigned to a different container. + assertEquals(previousJobModel[0].getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1); + assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1); + assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks().size(), 1); + Map<TaskName, TaskModel> updatedTaskModelMap1 = updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks(); + Map<TaskName, TaskModel> updatedTaskModelMap2 = updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks(); + assertEquals(updatedTaskModelMap1.size(), 1); + assertEquals(updatedTaskModelMap2.size(), 1); + + TaskModel taskModel1 = updatedTaskModelMap1.values().stream().findFirst().get(); + TaskModel taskModel2 = updatedTaskModelMap2.values().stream().findFirst().get(); + assertEquals(taskModel1.getSystemStreamPartitions(), taskModel2.getSystemStreamPartitions()); + assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName())); + + // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp) + processedMessagesLatch.await(); + } + //@Test public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException { // Set up kafka topics. @@ -495,7 +605,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne } static TestKafkaEvent fromString(String message) { - String[] messageComponents = message.split("|"); + String[] messageComponents = message.split("\\|"); return new TestKafkaEvent(messageComponents[0], messageComponents[1]); } } @@ -513,8 +623,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private final CountDownLatch kafkaEventsConsumedLatch; TestStreamApplication(String inputTopic, String outputTopic, - CountDownLatch processedMessagesLatch, - StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) { + CountDownLatch processedMessagesLatch, + StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.processedMessagesLatch = processedMessagesLatch;
