SAMZA-1282: Spinning up more containers than number of tasks. Changes
* Stop streamProcessor in onNewJobModelAvailable eventHandler(instead of onNewJobModelConfirmed eventHandler) when it's not part of the group and prevent it from joining the barrier. * When numContainerIds > numTaskModels, generate JobModel by choosing lexicographically least `x` containerIds(where x = numTaskModels). * Added unit and integration tests in appropriate classes to verify the expected behavior. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Boris Shkolnik <bor...@apache.org>, Navina Ramesh <nav...@apache.org> Closes #244 from shanthoosh/more_processor_than_tasks Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57758615 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57758615 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57758615 Branch: refs/heads/0.14.0 Commit: 57758615b3f8713364ac1afecab4f5355f64d1d4 Parents: 35143b6 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Tue Jul 25 15:32:22 2017 -0700 Committer: navina <nav...@apache.org> Committed: Tue Jul 25 15:32:22 2017 -0700 ---------------------------------------------------------------------- .../grouper/task/GroupByContainerIds.java | 18 +++- .../apache/samza/processor/StreamProcessor.java | 91 +++++++++----------- .../org/apache/samza/zk/ZkJobCoordinator.java | 52 +++++------ .../samza/zk/ZkJobCoordinatorFactory.java | 22 ++++- .../grouper/task/TestGroupByContainerIds.java | 31 +++++-- .../apache/samza/zk/TestZkJobCoordinator.java | 47 ++++++++++ .../processor/TestZkLocalApplicationRunner.java | 78 ++++++++++++++++- 7 files changed, 248 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java index 651dca7..f5a5a86 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java @@ -20,6 +20,7 @@ package org.apache.samza.container.grouper.task; import java.util.Arrays; +import java.util.stream.Collectors; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; @@ -31,6 +32,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -39,6 +42,8 @@ import java.util.Set; * IDs as an argument. Please note - this first implementation ignores locality information. */ public class GroupByContainerIds implements TaskNameGrouper { + private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerIds.class); + private final int startContainerCount; public GroupByContainerIds(int count) { this.startContainerCount = count; @@ -64,8 +69,17 @@ public class GroupByContainerIds implements TaskNameGrouper { throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays .toString(containersIds.toArray())); - if (containersIds.size() > tasks.size()) - throw new IllegalArgumentException("number of containers " + containersIds.size() + " is bigger than number of tasks " + tasks.size()); + if (containersIds.size() > tasks.size()) { + LOG.warn("Number of containers: {} is greater than number of tasks: {}.", containersIds.size(), tasks.size()); + /** + * Choose lexicographically least `x` containerIds(where x = tasks.size()). + */ + containersIds = containersIds.stream() + .sorted() + .limit(tasks.size()) + .collect(Collectors.toList()); + LOG.info("Generating containerModel with containers: {}.", containersIds); + } int containerCount = containersIds.size(); http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 590fa11..415111f 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -244,64 +244,59 @@ public class StreamProcessor { @Override public void onNewJobModel(String processorId, JobModel jobModel) { - if (!jobModel.getContainers().containsKey(processorId)) { - LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor."); - stop(); - } else { - jcContainerShutdownLatch = new CountDownLatch(1); - - SamzaContainerListener containerListener = new SamzaContainerListener() { - @Override - public void onContainerStart() { - if (!processorOnStartCalled) { - // processorListener is called on start only the first time the container starts. - // It is not called after every re-balance of partitions among the processors - processorOnStartCalled = true; - if (processorListener != null) { - processorListener.onStart(); - } - } else { - LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time."); - } - } + jcContainerShutdownLatch = new CountDownLatch(1); - @Override - public void onContainerStop(boolean pauseByJm) { - if (pauseByJm) { - LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator."); - if (jcContainerShutdownLatch != null) { - jcContainerShutdownLatch.countDown(); - } - } else { // sp.stop was called or container stopped by itself - LOGGER.info("Container " + container.toString() + " stopped."); - container = null; // this guarantees that stop() doesn't try to stop container again - stop(); + SamzaContainerListener containerListener = new SamzaContainerListener() { + @Override + public void onContainerStart() { + if (!processorOnStartCalled) { + // processorListener is called on start only the first time the container starts. + // It is not called after every re-balance of partitions among the processors + processorOnStartCalled = true; + if (processorListener != null) { + processorListener.onStart(); } + } else { + LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time."); } + } - @Override - public void onContainerFailed(Throwable t) { + @Override + public void onContainerStop(boolean pauseByJm) { + if (pauseByJm) { + LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator."); if (jcContainerShutdownLatch != null) { jcContainerShutdownLatch.countDown(); - } else { - LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); } - containerException = t; - LOGGER.error("Container failed. Stopping the processor.", containerException); - container = null; + } else { // sp.stop was called or container stopped by itself + LOGGER.info("Container " + container.toString() + " stopped."); + container = null; // this guarantees that stop() doesn't try to stop container again stop(); } - }; + } - container = createSamzaContainer( - jobModel.getContainers().get(processorId), - jobModel.maxChangeLogStreamPartitions); - container.setContainerListener(containerListener); - LOGGER.info("Starting container " + container.toString()); - executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("p-" + processorId + "-container-thread-%d").build()); - executorService.submit(container::run); - } + @Override + public void onContainerFailed(Throwable t) { + if (jcContainerShutdownLatch != null) { + jcContainerShutdownLatch.countDown(); + } else { + LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); + } + containerException = t; + LOGGER.error("Container failed. Stopping the processor.", containerException); + container = null; + stop(); + } + }; + + container = createSamzaContainer( + jobModel.getContainers().get(processorId), + jobModel.maxChangeLogStreamPartitions); + container.setContainerListener(containerListener); + LOGGER.info("Starting container " + container.toString()); + executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("p-" + processorId + "-container-thread-%d").build()); + executorService.submit(container::run); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index e973099..2204240 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkClient; import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; @@ -77,31 +76,29 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private JobModel newJobModel; private int debounceTimeMs; - public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) { + ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) { this.config = config; - ZkConfig zkConfig = new ZkConfig(config); - ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); - ZkClient zkClient = ZkCoordinationServiceFactory - .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - - // setup a listener for a session state change - // we are mostly interested in "session closed" and "new session created" events - zkClient.subscribeStateChanges(new ZkSessionStateChangedListener()); this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry); - this.zkUtils = new ZkUtils( - keyBuilder, - zkClient, - zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); this.processorId = createProcessorId(config); + this.zkUtils = zkUtils; + // setup a listener for a session state change + // we are mostly interested in "session closed" and "new session created" events + zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener()); LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils); leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector); - this.barrier = new ZkBarrierForVersionUpgrade(keyBuilder.getJobModelVersionBarrierPrefix(), zkUtils, + this.barrier = new ZkBarrierForVersionUpgrade( + zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), + zkUtils, new ZkBarrierListenerImpl()); this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs(); this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); + debounceTimer = new ScheduleAfterDebounceTime(throwable -> { + LOG.error("Received exception from in JobCoordinator Processing!", throwable); + stop(); + }); } @Override @@ -109,12 +106,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { startMetrics(); streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); - debounceTimer = new ScheduleAfterDebounceTime(throwable -> - { - LOG.error("Received exception from in JobCoordinator Processing!", throwable); - stop(); - }); - zkController.register(); } @@ -212,18 +203,21 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> { LOG.info("pid=" + processorId + "new JobModel available"); - - // stop current work - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); - } // get the new job model from ZK newJobModel = zkUtils.getJobModel(version); - LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel); - // update ZK and wait for all the processors to get this new version - barrier.join(version, processorId); + if (!newJobModel.getContainers().containsKey(processorId)) { + LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", newJobModel, processorId); + stop(); + } else { + // stop current work + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + // update ZK and wait for all the processors to get this new version + barrier.join(version, processorId); + } }); } http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index c077f94..85e3b4a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -19,13 +19,21 @@ package org.apache.samza.zk; +import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class); + /** * Method to instantiate an implementation of JobCoordinator * @@ -34,6 +42,16 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { */ @Override public JobCoordinator getJobCoordinator(Config config) { - return new ZkJobCoordinator(config, new MetricsRegistryMap()); + MetricsRegistry metricsRegistry = new MetricsRegistryMap(); + ZkUtils zkUtils = getZkUtils(config, metricsRegistry); + LOG.debug("Creating ZkJobCoordinator instance with config: {}.", config); + return new ZkJobCoordinator(config, metricsRegistry, zkUtils); + } + + private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) { + ZkConfig zkConfig = new ZkConfig(config); + ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId()); + ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); } } http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java index cd2cc3d..13afeef 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java @@ -19,6 +19,8 @@ package org.apache.samza.container.grouper.task; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -26,16 +28,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; import org.junit.Before; import org.junit.Test; import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels; -import static org.apache.samza.container.mock.ContainerMocks.getTaskModel; import static org.apache.samza.container.mock.ContainerMocks.getTaskName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -73,13 +76,6 @@ public class TestGroupByContainerIds { buildSimpleGrouper(1).group(new HashSet()); } - @Test(expected = IllegalArgumentException.class) - public void testGroupFewerTasksThanContainers() { - Set<TaskModel> taskModels = new HashSet<>(); - taskModels.add(getTaskModel(1)); - buildSimpleGrouper(2).group(taskModels); - } - @Test(expected = UnsupportedOperationException.class) public void testGrouperResultImmutable() { Set<TaskModel> taskModels = generateTaskModels(3); @@ -237,4 +233,23 @@ public class TestGroupByContainerIds { assertTrue(container1.getTasks().containsKey(getTaskName(6))); assertTrue(container1.getTasks().containsKey(getTaskName(8))); } + + @Test + public void testFewerTasksThanContainers() { + final String testContainerId1 = "1"; + final String testContainerId2 = "2"; + final int testProcessorId = 1; + + Set<TaskModel> taskModels = generateTaskModels(1); + List<String> containerIds = ImmutableList.of(testContainerId1, testContainerId2); + + Map<TaskName, TaskModel> expectedTasks = taskModels.stream() + .collect(Collectors.toMap(TaskModel::getTaskName, x -> x)); + ContainerModel expectedContainerModel = new ContainerModel(testContainerId1, testProcessorId, expectedTasks); + + Set<ContainerModel> actualContainerModels = buildSimpleGrouper().group(taskModels, containerIds); + + assertEquals(1, actualContainerModels.size()); + assertEquals(ImmutableSet.of(expectedContainerModel), actualContainerModels); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java new file mode 100644 index 0000000..9b5210f --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.HashMap; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestZkJobCoordinator { + private static final String TEST_BARRIER_ROOT = "/testBarrierRoot"; + private static final String TEST_JOB_MODEL_VERSION = "1"; + + @Test + public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() { + ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class); + Mockito.when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT); + + ZkUtils zkUtils = Mockito.mock(ZkUtils.class); + Mockito.when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder); + Mockito.when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>())); + + ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); + zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION); + + Mockito.doNothing().when(zkJobCoordinator).stop(); + Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index ebbe07b..cf0a242 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import kafka.admin.AdminUtils; import kafka.utils.TestUtils; import org.I0Itec.zkclient.ZkClient; @@ -96,6 +97,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private LocalApplicationRunner applicationRunner1; private LocalApplicationRunner applicationRunner2; private LocalApplicationRunner applicationRunner3; + private String testStreamAppName; + private String testStreamAppId; // Set 90 seconds as max execution time for each test. @Rule @@ -108,8 +111,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne public void setUp() { super.setUp(); String uniqueTestId = UUID.randomUUID().toString(); - String testStreamAppName = String.format("test-app-name-%s", uniqueTestId); - String testStreamAppId = String.format("test-app-id-%s", uniqueTestId); + testStreamAppName = String.format("test-app-name-%s", uniqueTestId); + testStreamAppId = String.format("test-app-id-%s", uniqueTestId); inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId); outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId); ZkClient zkClient = new ZkClient(zkConnect()); @@ -179,6 +182,77 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne } @Test + public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException { + /** + * sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test case(All ssp's from input kafka topic are mapped to a single task). + * Run a stream application(streamApp1) consuming messages from input topic(effectively one container). + * + * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2). + * + * Assertions: + * A) JobModel generated before and after the addition of streamApp2 should be equal. + * B) Second stream application(streamApp2) should not join the group and process any message. + */ + + // Set up kafka topics. + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]); + + // Configuration, verification variables + MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); + // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block). + final JobModel[] previousJobModel = new JobModel[1]; + final String[] previousJobModelVersion = new String[1]; + AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false); + final CountDownLatch secondProcessorRegistered = new CountDownLatch(1); + + zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> { + // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1. + if (currentChilds.contains(PROCESSOR_IDS[1])) { + secondProcessorRegistered.countDown(); + } + }); + + // Set up stream app 2. + CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS); + LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig)); + StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null); + + // Callback handler for streamApp1. + StreamApplicationCallback streamApplicationCallback = message -> { + if (hasSecondProcessorJoined.compareAndSet(false, true)) { + previousJobModelVersion[0] = zkUtils.getJobModelVersion(); + previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]); + localApplicationRunner2.run(streamApp2); + try { + // Wait for streamApp2 to register with zookeeper. + secondProcessorRegistered.await(); + } catch (InterruptedException e) { + } + } + }; + + CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); + + // Set up stream app 1. + LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig)); + StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch); + localApplicationRunner1.run(streamApp1); + + kafkaEventsConsumedLatch.await(); + + String currentJobModelVersion = zkUtils.getJobModelVersion(); + JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion); + + // JobModelVersion check to verify that leader publishes new jobModel. + assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion)); + // Job model before and after the addition of second stream processor should be the same. + assertEquals(previousJobModel[0], updatedJobModel); + // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp) + // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value. + assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); + } + + @Test public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException { // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);