Repository: samza
Updated Branches:
  refs/heads/master cb92cf18b -> 7887b6d86


SAMZA-1502; Make AllSspToSingleTaskGrouper work with Yarn and ZK JobCoordinator

Sending a fresh review as I lost the earlier diffs. This is the new approach 
that we discussed by adding the processor list in the config and passing it to 
grouper.

Author: Aditya Toomula <[email protected]>

Reviewers: Yi Pan <[email protected]>, Shanthoosh V 
<[email protected]>

Closes #383 from atoomula/samza


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7887b6d8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7887b6d8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7887b6d8

Branch: refs/heads/master
Commit: 7887b6d868ee048767c11c2db3a5d38093d9abf9
Parents: cb92cf1
Author: Aditya Toomula <[email protected]>
Authored: Tue Dec 12 21:41:37 2017 -0800
Committer: Jagadish <[email protected]>
Committed: Tue Dec 12 21:41:37 2017 -0800

----------------------------------------------------------------------
 .../AllSspToSingleTaskGrouperFactory.java       |  50 ++++--
 .../standalone/PassthroughJobCoordinator.java   |   6 +-
 .../org/apache/samza/config/JobConfig.scala     |   1 +
 .../samza/coordinator/JobModelManager.scala     |  21 ++-
 .../stream/TestAllSspToSingleTaskGrouper.java   | 125 +++++++++++++++
 .../processor/TestZkLocalApplicationRunner.java | 152 ++++++++++++++++---
 6 files changed, 315 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
index 2d22977..d3c5080 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
@@ -19,35 +19,40 @@
 
 package org.apache.samza.container.grouper.stream;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
 
 /**
- * AllSspToSingleTaskGrouper, as the name suggests, assigns all partitions to 
be consumed by a single TaskInstance
- * This is useful, in case of using load-balanced consumers like the new Kafka 
consumer, Samza doesn't control the
- * partitions being consumed by a task. Hence, it is assumed that there is 
only 1 task that processes all messages,
- * irrespective of which partition it belongs to.
- * This also implies that container and tasks are synonymous when this grouper 
is used. Taskname(s) has to be globally
- * unique within a given job.
+ * AllSspToSingleTaskGrouper creates TaskInstances equal to the number of 
containers and assigns all partitions to be
+ * consumed by each TaskInstance. This is useful, in case of using 
load-balanced consumers like the high-level Kafka
+ * consumer and Kinesis consumer, where Samza doesn't control the partitions 
being consumed by the task.
  *
- * Note: This grouper does not take in broadcast streams yet.
+ * Note that this grouper does not take in broadcast streams yet.
  */
+
 class AllSspToSingleTaskGrouper implements SystemStreamPartitionGrouper {
-  private final int containerId;
+  private final List<String> processorList;
 
-  public AllSspToSingleTaskGrouper(int containerId) {
-    this.containerId = containerId;
+  public AllSspToSingleTaskGrouper(List<String> processorList) {
+    this.processorList = processorList;
   }
 
   @Override
   public Map<TaskName, Set<SystemStreamPartition>> group(final 
Set<SystemStreamPartition> ssps) {
+    Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<>();
+
     if (ssps == null) {
       throw new SamzaException("ssp set cannot be null!");
     }
@@ -55,15 +60,28 @@ class AllSspToSingleTaskGrouper implements 
SystemStreamPartitionGrouper {
       throw new SamzaException("Cannot process stream task with no input 
system stream partitions");
     }
 
-    final TaskName taskName = new TaskName(String.format("Task-%s", 
String.valueOf(containerId)));
+    processorList.forEach(processor -> {
+        // Create a task name for each processor and assign all partitions to 
each task name.
+        final TaskName taskName = new TaskName(String.format("Task-%s", 
processor));
+        groupedMap.put(taskName, ssps);
+      });
 
-    return Collections.singletonMap(taskName, ssps);
+    return groupedMap;
   }
 }
 
 public class AllSspToSingleTaskGrouperFactory implements 
SystemStreamPartitionGrouperFactory {
   @Override
   public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config) {
-    return new 
AllSspToSingleTaskGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+    if (!(new TaskConfigJava(config).getBroadcastSystemStreams().isEmpty())) {
+      throw new ConfigException("The job configured with 
AllSspToSingleTaskGrouper cannot have broadcast streams.");
+    }
+
+    String processors = config.get(JobConfig.PROCESSOR_LIST());
+    List<String> processorList = Arrays.asList(processors.split(","));
+    if (processorList.isEmpty()) {
+      throw new SamzaException("processor list cannot be empty!");
+    }
+    return new AllSspToSingleTaskGrouper(processorList);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 87a1cfa..5147169 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
@@ -125,13 +126,16 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
         Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, 
SystemClock.instance());
 
+    String containerId = 
Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
+
     /** TODO:
      Locality Manager seems to be required in JC for reading locality info and 
grouping tasks intelligently and also,
      in SamzaContainer for writing locality info to the coordinator stream. 
This closely couples together
      TaskNameGrouper with the LocalityManager! Hence, groupers should be a 
property of the jobcoordinator
      (job.coordinator.task.grouper, instead of 
task.systemstreampartition.grouper)
      */
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), 
null, streamMetadataCache, null);
+    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), 
null, streamMetadataCache,
+        Collections.singletonList(containerId));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 083dbaf..de83919 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -79,6 +79,7 @@ object JobConfig {
 
   // Processor Config Constants
   val PROCESSOR_ID = "processor.id"
+  val PROCESSOR_LIST = "processor.list"
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index c2e0665..99b1abe 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -24,7 +24,9 @@ import java.util
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.samza.config.ClusterManagerConfig
+import org.apache.samza.config.JobConfig
 import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.MapConfig
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.Config
@@ -49,6 +51,7 @@ import org.apache.samza.util.Util
 import org.apache.samza.{Partition, PartitionChangeException, SamzaException}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
 /**
  * Helper companion object that is responsible for wiring up a JobModelManager
@@ -104,7 +107,15 @@ object JobModelManager extends Logging {
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins = 
systemAdmins, cacheTTLms = 0)
     val previousChangelogPartitionMapping = 
changelogManager.readChangeLogPartitionMapping()
-    val jobModelManager = getJobModelManager(config, 
previousChangelogPartitionMapping, localityManager, streamMetadataCache, null)
+
+    val processorList = new ListBuffer[String]()
+    val containerCount = new JobConfig(config).getContainerCount
+    for (i <- 0 until containerCount) {
+      processorList += i.toString
+    }
+
+    val jobModelManager = getJobModelManager(config, 
previousChangelogPartitionMapping, localityManager,
+      streamMetadataCache, processorList.toList.asJava)
     val jobModel = jobModelManager.jobModel
     // Save the changelog mapping back to the ChangelogPartitionmanager
     // newChangelogPartitionMapping is the merging of all current 
task:changelog
@@ -211,7 +222,13 @@ object JobModelManager extends Logging {
                    containerIds: java.util.List[String]): JobModel = {
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, 
streamMetadataCache)
-    val grouper = getSystemStreamPartitionGrouper(config)
+
+    // processor list is required by some of the groupers. So, let's pass them 
as part of the config.
+    // Copy the config and add the processor list to the config copy.
+    val configMap = new util.HashMap[String, String](config)
+    configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", containerIds))
+    val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
+
     val groups = grouper.group(allSystemStreamPartitions.asJava)
     info("SystemStreamPartitionGrouper %s has grouped the 
SystemStreamPartitions into %d tasks with the following taskNames: %s" 
format(grouper, groups.size(), groups.keySet()))
 

http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java
new file mode 100644
index 0000000..fa3b33f
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestAllSspToSingleTaskGrouper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestAllSspToSingleTaskGrouper {
+  private SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", 
"StreamA", new Partition(0));
+  private SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", 
"StreamA", new Partition(1));
+  private SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", 
"StreamA", new Partition(2));
+  private SystemStreamPartition ab0 = new SystemStreamPartition("SystemA", 
"StreamB", new Partition(0));
+  private AllSspToSingleTaskGrouperFactory grouperFactory = new 
AllSspToSingleTaskGrouperFactory();
+
+  @Test
+  public void testLocalStreamGroupedCorrectlyForYarn() {
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
+    HashMap<String, String> configMap = new HashMap<>();
+
+    configMap.put("job.container.count", "2");
+    configMap.put("processor.list", "0,1");
+
+    Config config = new MapConfig(configMap);
+
+    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
+
+    Collections.addAll(allSSPs, aa0, aa1, aa2, ab0);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>();
+
+    HashSet<SystemStreamPartition> partitions = new HashSet<>();
+    partitions.add(aa0);
+    partitions.add(aa1);
+    partitions.add(aa2);
+    partitions.add(ab0);
+    expectedResult.put(new TaskName("Task-0"), partitions);
+    expectedResult.put(new TaskName("Task-1"), partitions);
+
+    assertEquals(expectedResult, result);
+  }
+
+  @Test
+  public void testLocalStreamGroupedCorrectlyForPassthru() {
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
+    HashMap<String, String> configMap = new HashMap<>();
+
+    configMap.put("job.coordinator.factory", 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configMap.put("processor.id", "1");
+    configMap.put("processor.list", configMap.get("processor.id"));
+
+    Config config = new MapConfig(configMap);
+
+    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
+
+    Collections.addAll(allSSPs, aa0, aa1, aa2, ab0);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>();
+
+    HashSet<SystemStreamPartition> partitions = new HashSet<>();
+    partitions.add(aa0);
+    partitions.add(aa1);
+    partitions.add(aa2);
+    partitions.add(ab0);
+    expectedResult.put(new TaskName("Task-1"), partitions);
+
+    assertEquals(expectedResult, result);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testLocalStreamWithEmptySsps() {
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
+    HashMap<String, String> configMap = new HashMap<>();
+
+    configMap.put("job.coordinator.factory", 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configMap.put("processor.list", "1");
+    Config config = new MapConfig(configMap);
+
+    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
+
+    grouper.group(allSSPs);
+  }
+
+  @Test(expected = ConfigException.class)
+  public void testLocalStreamWithBroadcastStream() {
+    HashMap<String, String> configMap = new HashMap<>();
+
+    configMap.put("task.broadcast.inputs", "test.stream#0");
+    configMap.put("processor.list", "1");
+    Config config = new MapConfig(configMap);
+
+    grouperFactory.getSystemStreamPartitionGrouper(config);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/7887b6d8/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index eb087bb..9c5dad5 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -40,8 +40,10 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
@@ -97,6 +99,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
   private String inputKafkaTopic;
   private String outputKafkaTopic;
+  private String inputSinglePartitionKafkaTopic;
+  private String outputSinglePartitionKafkaTopic;
   private ZkUtils zkUtils;
   private ApplicationConfig applicationConfig1;
   private ApplicationConfig applicationConfig2;
@@ -113,7 +117,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   @Rule
   public final ExpectedException expectedException = ExpectedException.none();
 
-//  @Override
+  //  @Override
   public void setUp() {
     super.setUp();
     String uniqueTestId = UUID.randomUUID().toString();
@@ -121,6 +125,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
     inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
     outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
+    inputSinglePartitionKafkaTopic = 
String.format("test-input-single-partition-topic-%s", uniqueTestId);
+    outputSinglePartitionKafkaTopic = 
String.format("test-output-single-partition-topic-%s", uniqueTestId);
 
     // Set up stream application config map with the given testStreamAppName, 
testStreamAppId and test kafka system
     // TODO: processorId should typically come up from a processorID generator 
as processor.id will be deprecated in 0.14.0+
@@ -147,15 +153,23 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
       LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
       TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new 
Properties());
     }
+    for (String kafkaTopic : ImmutableList.of(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic)) {
+      LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
+      TestUtils.createTopic(zkUtils(), kafkaTopic, 1, 1, servers(), new 
Properties());
+    }
   }
 
-//  @Override
+  //  @Override
   public void tearDown() {
     if (zookeeper().zookeeper().isRunning()) {
       for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, 
outputKafkaTopic)) {
         LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
         AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
       }
+      for (String kafkaTopic : 
ImmutableList.of(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic)) {
+        LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
+        AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
+      }
       zkUtils.close();
       super.tearDown();
     }
@@ -175,7 +189,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   }
 
   private Map<String, String> buildStreamApplicationConfigMap(String 
systemName, String inputTopic,
-                                                              String appName, 
String appId) {
+      String appName, String appId) {
     Map<String, String> samzaContainerConfig = ImmutableMap.<String, 
String>builder()
         .put(TaskConfig.INPUT_STREAMS(), inputTopic)
         .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
@@ -197,24 +211,25 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     return applicationConfig;
   }
 
+  /**
+   * sspGrouper is set to GroupBySystemStreamPartitionFactory.
+   * Run a stream application(streamApp1) consuming messages from input 
topic(effectively one container).
+   *
+   * In the callback triggered by streamApp1 after processing a message, bring 
up an another stream application(streamApp2).
+   *
+   * Assertions:
+   *           A) JobModel generated before and after the addition of 
streamApp2 should be equal.
+   *           B) Second stream application(streamApp2) should not join the 
group and process any message.
+   */
+
   //@Test
   public void 
shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() 
throws InterruptedException {
-    /**
-     * sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test 
case(All ssp's from input kafka topic are mapped to a single task).
-     * Run a stream application(streamApp1) consuming messages from input 
topic(effectively one container).
-     *
-     * In the callback triggered by streamApp1 after processing a message, 
bring up an another stream application(streamApp2).
-     *
-     * Assertions:
-     *           A) JobModel generated before and after the addition of 
streamApp2 should be equal.
-     *           B) Second stream application(streamApp2) should not join the 
group and process any message.
-     */
-
     // Set up kafka topics.
-    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, 
PROCESSOR_IDS[0]);
+    publishKafkaEvents(inputSinglePartitionKafkaTopic, NUM_KAFKA_EVENTS * 2, 
PROCESSOR_IDS[0]);
 
     // Configuration, verification variables
-    MapConfig testConfig = new 
MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", 
JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
+    MapConfig testConfig = new 
MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(),
+        
"org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory",
 JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
     // Declared as final array to update it from streamApplication 
callback(Variable should be declared final to access in lambda block).
     final JobModel[] previousJobModel = new JobModel[1];
     final String[] previousJobModelVersion = new String[1];
@@ -231,7 +246,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     // Set up stream app 2.
     CountDownLatch processedMessagesLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS);
     LocalApplicationRunner localApplicationRunner2 = new 
LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
-    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, processedMessagesLatch, null, null);
+    StreamApplication streamApp2 = new 
TestStreamApplication(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
+        processedMessagesLatch, null, null);
 
     // Callback handler for streamApp1.
     StreamApplicationCallback streamApplicationCallback = message -> {
@@ -251,7 +267,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     // Set up stream app 1.
     LocalApplicationRunner localApplicationRunner1 = new 
LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
-    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch);
+    StreamApplication streamApp1 = new 
TestStreamApplication(inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
+        null, streamApplicationCallback, kafkaEventsConsumedLatch);
     localApplicationRunner1.run(streamApp1);
 
     kafkaEventsConsumedLatch.await();
@@ -268,6 +285,99 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
   }
 
+  /**
+   * sspGrouper is set to AllSspToSingleTaskGrouperFactory (All ssps from 
input kafka topic are mapped to a single task per container).
+   * AllSspToSingleTaskGrouperFactory should be used only with high-level 
consumers which do the partition management
+   * by themselves. Using the factory with the consumers that do not do the 
partition management will result in
+   * each processor/task consuming all the messages from all the partitions.
+   * Run a stream application(streamApp1) consuming messages from input 
topic(effectively one container).
+   *
+   * In the callback triggered by streamApp1 after processing a message, bring 
up an another stream application(streamApp2).
+   *
+   * Assertions:
+   *           A) JobModel generated before and after the addition of 
streamApp2 should not be equal.
+   *           B) Second stream application(streamApp2) should join the group 
and process all the messages.
+   */
+
+  //@Test
+  public void 
shouldUpdateJobModelWhenNewProcessorJoiningGroupUsingAllSspToSingleTaskGrouperFactory()
 throws InterruptedException {
+    // Set up kafka topics.
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, 
PROCESSOR_IDS[0]);
+
+    // Configuration, verification variables
+    MapConfig testConfig = new 
MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", 
JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
+    // Declared as final array to update it from streamApplication 
callback(Variable should be declared final to access in lambda block).
+    final JobModel[] previousJobModel = new JobModel[1];
+    final String[] previousJobModelVersion = new String[1];
+    AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false);
+    final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
+
+    zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
+        // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start 
processing message in streamApp1.
+        if (currentChilds.contains(PROCESSOR_IDS[1])) {
+          secondProcessorRegistered.countDown();
+        }
+      });
+
+    // Set up streamApp2.
+    CountDownLatch processedMessagesLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS * 2);
+    LocalApplicationRunner localApplicationRunner2 = new 
LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
+    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, processedMessagesLatch, null, null);
+
+    // Callback handler for streamApp1.
+    StreamApplicationCallback streamApplicationCallback = message -> {
+      if (hasSecondProcessorJoined.compareAndSet(false, true)) {
+        previousJobModelVersion[0] = zkUtils.getJobModelVersion();
+        previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+        localApplicationRunner2.run(streamApp2);
+        try {
+          // Wait for streamApp2 to register with zookeeper.
+          secondProcessorRegistered.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    };
+
+    // This is the latch for the messages received by streamApp1. Since 
streamApp1 is run first, it gets one event
+    // redelivered due to re-balancing done by Zk after the streamApp2 joins 
(See the callback above).
+    CountDownLatch kafkaEventsConsumedLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1);
+
+    // Set up stream app 1.
+    LocalApplicationRunner localApplicationRunner1 = new 
LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
+    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, null,
+        streamApplicationCallback, kafkaEventsConsumedLatch);
+    localApplicationRunner1.run(streamApp1);
+
+    kafkaEventsConsumedLatch.await();
+
+    String currentJobModelVersion = zkUtils.getJobModelVersion();
+    JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+
+    // JobModelVersion check to verify that leader publishes new jobModel.
+    assertTrue(Integer.parseInt(previousJobModelVersion[0]) < 
Integer.parseInt(currentJobModelVersion));
+
+    // Job model before and after the addition of second stream processor 
should not be the same.
+    assertTrue(!previousJobModel[0].equals(updatedJobModel));
+
+    // Task names in the job model should be different but the set of 
partitions should be the same and each task name
+    // should be assigned to a different container.
+    
assertEquals(previousJobModel[0].getContainers().get(PROCESSOR_IDS[0]).getTasks().size(),
 1);
+    
assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks().size(),
 1);
+    
assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks().size(),
 1);
+    Map<TaskName, TaskModel> updatedTaskModelMap1 = 
updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks();
+    Map<TaskName, TaskModel> updatedTaskModelMap2 = 
updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks();
+    assertEquals(updatedTaskModelMap1.size(), 1);
+    assertEquals(updatedTaskModelMap2.size(), 1);
+
+    TaskModel taskModel1 = 
updatedTaskModelMap1.values().stream().findFirst().get();
+    TaskModel taskModel2 = 
updatedTaskModelMap2.values().stream().findFirst().get();
+    assertEquals(taskModel1.getSystemStreamPartitions(), 
taskModel2.getSystemStreamPartitions());
+    
assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
+
+    // TODO: After SAMZA-1364 add assertion for 
localApplicationRunner2.status(streamApp)
+    processedMessagesLatch.await();
+  }
+
   //@Test
   public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
     // Set up kafka topics.
@@ -495,7 +605,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     }
 
     static TestKafkaEvent fromString(String message) {
-      String[] messageComponents = message.split("|");
+      String[] messageComponents = message.split("\\|");
       return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
     }
   }
@@ -513,8 +623,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     private final CountDownLatch kafkaEventsConsumedLatch;
 
     TestStreamApplication(String inputTopic, String outputTopic,
-                          CountDownLatch processedMessagesLatch,
-                          StreamApplicationCallback streamApplicationCallback, 
CountDownLatch kafkaEventsConsumedLatch) {
+        CountDownLatch processedMessagesLatch,
+        StreamApplicationCallback streamApplicationCallback, CountDownLatch 
kafkaEventsConsumedLatch) {
       this.inputTopic = inputTopic;
       this.outputTopic = outputTopic;
       this.processedMessagesLatch = processedMessagesLatch;

Reply via email to