Repository: samza
Updated Branches:
  refs/heads/master 1b3b36b6e -> de52eea07


SAMZA-41: allow static partition assignment to Samza jobs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de52eea0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de52eea0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de52eea0

Branch: refs/heads/master
Commit: de52eea07cd5ebb26cdfcf2afe9975dd9ab30623
Parents: 1b3b36b
Author: Jose <[email protected]>
Authored: Tue Jun 21 15:14:06 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Tue Jun 21 15:14:06 2016 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  81 ++++++++++++++
 .../system/SystemStreamPartitionMatcher.java    |  28 +++++
 .../org/apache/samza/config/JobConfig.scala     |  25 +++++
 .../samza/coordinator/JobCoordinator.scala      |  28 ++++-
 .../RangeSystemStreamPartitionMatcher.scala     |  55 ++++++++++
 .../RegexSystemStreamPartitionMatcher.scala     |  35 ++++++
 .../samza/coordinator/TestJobCoordinator.scala  |  94 +++++++++++++++-
 .../TestRangeSystemStreamPartitionMatcher.scala | 109 +++++++++++++++++++
 .../TestRegexSystemStreamPartitionMatcher.scala |  79 ++++++++++++++
 9 files changed, 531 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index c1c822e..ea8d5e9 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -252,6 +252,87 @@
                         </dl>
                     </td>
                 </tr>
+
+                <tr>
+                    <td class="property" 
id="job-systemstreampartition-matcher-class">job.systemstreampartition.matcher.class</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        If you want to enable static partition assignment, 
then this is a <strong>required</strong> configuration.
+                        The value of this property is a fully-qualified Java 
class name that implements the interface
+                        
<code>org.apache.samza.system.SystemStreamPartitionMatcher</code>.
+                        Samza ships with two matcher classes:
+                        <dl>
+                            
<dt><code>org.apache.samza.system.RangeSystemStreamPartitionMatcher</code></dt>
+                            <dd>This classes uses a comma separated list of 
range(s) to determine which partition matches,
+                                and thus statically assigned to the Job. For 
example "2,3,1-2", statically assigns partition
+                                1, 2, and 3 for all the specified system and 
streams (topics in case of Kafka) to the job.
+                                For config validation each element in the 
comma separated list much conform to one of the
+                                following regex:
+                                <ul>
+                                    <li><code>"(\\d+)"</code> or </li>
+                                    <li><code>"(\\d+-\\d+)"</code> </li>
+                                </ul>
+                                <code>JobConfig.SSP_MATCHER_CLASS_RANGE</code> 
constant has the canonical name of this class.
+                            </dd>
+                        </dl>
+                        <dl>
+                            
<dt><code>org.apache.samza.system.RegexSystemStreamPartitionMatcher</code></dt>
+                            <dd>This classes uses a standard Java supported 
regex to determine which partition matches,
+                                and thus statically assigned to the Job. For 
example "[1-2]", statically assigns partition 1 and 2
+                                for all the specified system and streams 
(topics in case of Kafka) to the job.
+                                <code>JobConfig.SSP_MATCHER_CLASS_REGEX</code> 
constant has the canonical name of this class.</dd>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.matcher.config.range</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        If 
<code>job.systemstreampartition.matcher.class</code> is specified, and the 
value of this property is
+                        
<code>org.apache.samza.system.RangeSystemStreamPartitionMatcher</code>, then 
this property is a
+                        <strong>required</strong> configuration. Specify a 
comma separated list of range(s) to determine which
+                        partition matches, and thus statically assigned to the 
Job. For example "2,3,11-20", statically assigns
+                        partition 2, 3, and 11 to 20 for all the specified 
system and streams (topics in case of Kafka) to the job.
+                        A singel configuration value like "19" is valid as 
well. This statically assigns partition 19.
+                        For config validation each element in the comma 
separated list much conform to one of the
+                        following regex:
+                        <ul>
+                            <li><code>"(\\d+)"</code> or </li>
+                            <li><code>"(\\d+-\\d+)"</code> </li>
+                        </ul>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.matcher.config.regex</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        If 
<code>job.systemstreampartition.matcher.class</code> is specified, and the 
value of this property is
+                        
<code>org.apache.samza.system.RegexSystemStreamPartitionMatcher</code>, then 
this property is a
+                        <strong>required</strong> configuration. The value 
should be a valid Java supported regex. For example "[1-2]",
+                        statically assigns partition 1 and 2 for all the 
specified system and streams (topics in case of Kakfa) to the job.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="job_systemstreampartition_matcher_co
+                    
nfig_job_factory_regex">job.systemstreampartition.matcher.config.job.factory.regex</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        This configuration can be used to specify the Java 
supported regex to match the <code>StreamJobFactory</code>
+                        for which the static partition assignment should be 
enabled. This configuration enables the partition
+                        assignment feature to be used for custom 
<code>StreamJobFactory</code>(ies) as well.
+                        <p>
+                            This config defaults to the following value:
+                            
<code>"org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)"</code>,
+                            which enables static partition assignment when 
<code>job.factory.class</code> is set to
+                            
<code>org.apache.samza.job.local.ProcessJobFactory</code> or 
<code>org.apache.samza.job.local.ThreadJobFactory.</code>
+                        </p>
+                    </td>
+                </tr>
+
+
                 <tr>
                     <td class="property" 
id="job-checkpoint-validation-enabled">job.checkpoint.<br>validation.enabled</td>
                     <td class="default">true</td>

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java
 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java
new file mode 100644
index 0000000..bea2187
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.apache.samza.config.Config;
+
+import java.util.Set;
+
+public interface SystemStreamPartitionMatcher {
+  Set<SystemStreamPartition> filter(Set<SystemStreamPartition> 
systemStreamPartitions, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 23a51b2..49b08f6 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -23,6 +23,7 @@ package org.apache.samza.config
 import java.io.File
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.system.{RegexSystemStreamPartitionMatcher, 
SystemStreamPartitionMatcher}
 import org.apache.samza.util.Logging
 
 object JobConfig {
@@ -46,6 +47,21 @@ object JobConfig {
   val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
   val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
+
+  val SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class";
+
+  val SSP_MATCHER_CLASS_REGEX = 
"org.apache.samza.system.RegexSystemStreamPartitionMatcher"
+
+  val SSP_MATCHER_CLASS_RANGE = 
"org.apache.samza.system.RangeSystemStreamPartitionMatcher"
+
+  val SSP_MATCHER_CONFIG_REGEX = 
"job.systemstreampartition.matcher.config.regex";
+
+  val SSP_MATCHER_CONFIG_RANGES = 
"job.systemstreampartition.matcher.config.ranges";
+
+  val SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = 
"job.systemstreampartition.matcher.config.job.factory.regex";
+
+  val DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = 
"org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)";
+
   // number of partitions in the checkpoint stream should be 1. But sometimes,
   // if a stream was created(automatically) with the wrong number of 
partitions(default number of partitions
   // for new streams), there is no easy fix for the user (topic deletion or 
reducing of number of partitions
@@ -142,4 +158,13 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
         case _ => "26214400"
       }
   }
+
+  def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS)
+
+  def getSSPMatcherConfigRegex = getExcept(JobConfig.SSP_MATCHER_CONFIG_REGEX)
+
+  def getSSPMatcherConfigRanges = 
getExcept(JobConfig.SSP_MATCHER_CONFIG_RANGES)
+
+  def getSSPMatcherConfigJobFactoryRegex = 
getOrElse(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, 
JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX)
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index e9a5108..0aee4ce 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -35,11 +35,13 @@ import 
org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, 
SystemFactory, SystemStreamPartition}
+import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, 
SystemFactory, SystemStreamPartition, SystemStreamPartitionMatcher}
 import org.apache.samza.util.{Logging, Util}
 import org.apache.samza.{Partition, SamzaException}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
 
 /**
  * Helper companion object that is responsible for wiring up a JobCoordinator
@@ -145,6 +147,28 @@ object JobModelManager extends Logging {
       }.toSet
   }
 
+  def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: 
StreamMetadataCache) : Set[SystemStreamPartition] = {
+    val allSystemStreamPartitions = getInputStreamPartitions(config, 
streamMetadataCache)
+    config.getSSPMatcherClass match {
+      case Some(s) => {
+        val jfr = config.getSSPMatcherConfigJobFactoryRegex.r
+        config.getStreamJobFactoryClass match {
+          case Some(jfr(_*)) => {
+            info("before match: allSystemStreamPartitions.size = %s" format 
(allSystemStreamPartitions.size))
+            val sspMatcher = Util.getObj[SystemStreamPartitionMatcher](s)
+            val matchedPartitions = 
sspMatcher.filter(allSystemStreamPartitions, config).asScala.toSet
+            // Usually a small set hence ok to log at info level
+            info("after match: matchedPartitions = %s" format 
(matchedPartitions))
+            matchedPartitions
+          }
+          case _ => allSystemStreamPartitions
+        }
+      }
+      case _ => allSystemStreamPartitions
+    }
+  }
+
+
   /**
    * Gets a SystemStreamPartitionGrouper object from the configuration.
    */
@@ -163,7 +187,7 @@ object JobModelManager extends Logging {
                                  localityManager: LocalityManager,
                                  streamMetadataCache: StreamMetadataCache): 
JobModel = {
     // Do grouping to fetch TaskName to SSP mapping
-    val allSystemStreamPartitions = getInputStreamPartitions(config, 
streamMetadataCache)
+    val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, 
streamMetadataCache)
     val grouper = getSystemStreamPartitionGrouper(config)
     val groups = grouper.group(allSystemStreamPartitions)
     info("SystemStreamPartitionGrouper %s has grouped the 
SystemStreamPartitions into %d tasks with the following taskNames: %s" 
format(grouper, groups.size(), groups.keySet()))

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala
new file mode 100644
index 0000000..85bb271
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.system
+
+import java.util
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.{JobConfig, Config}
+import org.apache.samza.config.JobConfig.Config2Job
+import scala.collection.JavaConverters._
+
+class RangeSystemStreamPartitionMatcher extends SystemStreamPartitionMatcher {
+
+  override def filter(systemStreamPartitions: util.Set[SystemStreamPartition], 
config: Config): util.Set[SystemStreamPartition] = {
+    val sspRanges = config.getSSPMatcherConfigRanges
+    val ranges: Array[String] = sspRanges.split(",")
+    val rangeMap = collection.mutable.Map[Int, Int]()
+
+    // Accept single or multiple partition ranges in one config - 2,3,7-9,1 or 
19
+    // Overlapping ranges are fine
+    val rxS = "(\\d+)".r         // single digits
+    val rxR = "(\\d+-\\d+)".r   // range like 7-9
+
+    ranges.foreach({
+      case rxS(s) => rangeMap.put(s.toInt, s.toInt)
+      case rxR(r) =>
+        val s = r.split("-")
+        (s(0).toInt to s(1).toInt).foreach(k => rangeMap.put(k, k))
+      case _ =>
+        val error = "Invalid partition range configuration '%s': %s"
+          .format(JobConfig.SSP_MATCHER_CONFIG_RANGES, sspRanges)
+        throw new SamzaException(error)
+    })
+    val sspSetScala = systemStreamPartitions.asScala
+    sspSetScala.filter(s => 
rangeMap.contains(s.partition.getPartitionId)).asJava
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala
new file mode 100644
index 0000000..0aaa432
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import java.util
+
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import scala.collection.JavaConverters._
+
+
+class RegexSystemStreamPartitionMatcher extends SystemStreamPartitionMatcher {
+  override def filter(systemStreamPartitions: util.Set[SystemStreamPartition], 
config: Config): util.Set[SystemStreamPartition] = {
+    val sspRegex = config.getSSPMatcherConfigRegex
+    val sspSetScala = systemStreamPartitions.asScala
+    sspSetScala.filter(s => 
s.partition.getPartitionId.toString.matches(sspRegex)).asJava
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 55a879b..fcabc69 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -21,6 +21,9 @@ package org.apache.samza.coordinator
 
 import java.util
 
+import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
+import org.apache.samza.job.MockJobFactory
+import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.util.Util
 import org.junit.{After, Test}
@@ -34,7 +37,7 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.Config
 import org.apache.samza.system._
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.Partition
+import org.apache.samza.{SamzaException, Partition}
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.TaskModel
@@ -179,6 +182,95 @@ class TestJobCoordinator {
     coordinator.stop
   }
 
+  /**
+    * Builds a coordinator from config, and then compares it with what was 
expected.
+    * We initialize with 3 partitions. We use the provided SSP_MATCHER_CLASS 
and
+    * specify a regex to only allow partitions 1-2 to be assigned to the job.
+    * We expect that the JobModel will only have two SystemStreamPartitions.
+    * Previous test tested without any matcher class. This test is with 
ThreadJobFactory.
+    */
+  @Test
+  def testWithPartitionAssignmentWithThreadJobFactory {
+    val config = getTestConfig(classOf[ThreadJobFactory])
+    val coordinator = JobModelManager(config)
+
+    // Construct the expected JobModel, so we can compare it to
+    // JobCoordinator's JobModel.
+    val task1Name = new TaskName("Partition 1")
+    val task2Name = new TaskName("Partition 2")
+    val container0Tasks = Map(
+      task1Name -> new TaskModel(task1Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(0)))
+    val container1Tasks = Map(
+      task2Name -> new TaskModel(task2Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5)))
+    val containers = Map(
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks))
+    val jobModel = new JobModel(config, containers)
+    assertEquals(config, coordinator.jobModel.getConfig)
+    assertEquals(jobModel, coordinator.jobModel)
+  }
+
+  /**
+    * Tests with ProcessJobFactory.
+    */
+  @Test
+  def testWithPartitionAssignmentWithProcessJobFactory {
+    val config = getTestConfig(classOf[ProcessJobFactory])
+    val coordinator = JobModelManager(config)
+
+    // Construct the expected JobModel, so we can compare it to
+    // JobCoordinator's JobModel.
+    val task1Name = new TaskName("Partition 1")
+
+    val container0Tasks = Map(
+      task1Name -> new TaskModel(task1Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(0)))
+
+    val containers = Map(
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks))
+    val jobModel = new JobModel(config, containers)
+    assertEquals(config, coordinator.jobModel.getConfig)
+    assertEquals(jobModel, coordinator.jobModel)
+  }
+
+  /**
+    * Test with a JobFactory other than ProcessJobFactory or ThreadJobFactory 
so that
+    * JobConfing.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX does not match.
+    */
+  @Test
+  def testWithPartitionAssignmentWithMockJobFactory {
+    val config = new SystemConfig(getTestConfig(classOf[MockJobFactory]))
+
+    val systemNames = Set("test")
+
+    // Map the name of each system to the corresponding SystemAdmin
+    val systemAdmins = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemName))
+      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      systemName -> systemFactory.getAdmin(systemName, config)
+    }).toMap
+
+    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+
+    val allSSP = JobModelManager.getInputStreamPartitions(config, 
streamMetadataCache)
+    val matchedSSP = JobModelManager.getMatchedInputStreamPartitions(config, 
streamMetadataCache)
+    assertEquals(matchedSSP, allSSP)
+  }
+
+  def getTestConfig(clazz : Class[_]) = {
+    val config = new MapConfig(Map(
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+      JobConfig.JOB_NAME -> "test",
+      JobConfig.STREAM_JOB_FACTORY_CLASS -> clazz.getCanonicalName,
+      JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
+      JobConfig.SSP_MATCHER_CONFIG_REGEX -> "[1]",
+      SystemConfig.SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName))
+    config
+  }
+
   def extractChangelogPartitionMapping(url : String) = {
     val jobModel = SamzaContainer.readJobModel(url.toString)
     val taskModels = 
jobModel.getContainers.values().flatMap(_.getTasks.values())

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
new file mode 100644
index 0000000..1f81589
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system
+
+import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
+import org.apache.samza.config._
+import org.apache.samza.coordinator.MockSystemFactory
+import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.{Partition, SamzaException}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class TestRangeSystemStreamPartitionMatcher {
+  val sspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new 
Partition(0)))
+  sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(1)))
+  sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2)))
+
+  @Test
+  def testFilterWithMatcherConfigSingleRange() {
+    val config = getConfig("1,2")
+
+    val expectedSspSet = mutable.Set(new SystemStreamPartition("test", 
"stream1", new Partition(1)))
+    expectedSspSet.add(new SystemStreamPartition("test", "stream1", new 
Partition(2)))
+
+    val filteredSet = new 
RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
+    assertEquals(2, filteredSet.size)
+    assertEquals(expectedSspSet.asJava, filteredSet)
+  }
+
+  private def getConfig(range: String): MapConfig = {
+    new MapConfig(mutable.Map(
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      JobConfig.STREAM_JOB_FACTORY_CLASS -> 
classOf[ThreadJobFactory].getCanonicalName,
+      JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
+      JobConfig.SSP_MATCHER_CONFIG_RANGES -> range,
+      (SystemConfig.SYSTEM_FACTORY format "test") -> 
classOf[MockSystemFactory].getCanonicalName))
+  }
+
+  @Test
+  def testFilterWithMatcherConfigRanges() {
+    val config = getConfig("0-1,1,2")
+
+    val expectedSspSet = mutable.Set(new SystemStreamPartition("test", 
"stream1", new Partition(0)))
+    expectedSspSet.add(new SystemStreamPartition("test", "stream1", new 
Partition(1)))
+    expectedSspSet.add(new SystemStreamPartition("test", "stream1", new 
Partition(2)))
+
+    val filteredSet = new 
RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
+    assertEquals(3, filteredSet.size)
+    assertEquals(expectedSspSet.asJava, filteredSet)
+  }
+
+  @Test(expected = classOf[SamzaException])
+  def testFilterWithInvalidMatcherConfigRange() {
+    val config = getConfig("--")
+
+    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, 
config)
+    assertEquals(0, filteredSet.size)
+  }
+
+  @Test
+  def testFilterWithMatcherConfigRangeWithNomatches() {
+    val config = getConfig("4-5")
+
+    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, 
config)
+    assertEquals(0, filteredSet.size)
+  }
+
+  @Test(expected = classOf[SamzaException])
+  def testFilterWithEmptyMatcherConfigRange() {
+    val config = getConfig("")
+
+    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, 
config)
+    assertEquals(0, filteredSet.size)
+  }
+
+  @Test(expected = classOf[SamzaException])
+  def testFilterWithNoMatcherConfigRange() {
+    val config = new MapConfig(mutable.Map(
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      JobConfig.STREAM_JOB_FACTORY_CLASS -> 
classOf[ThreadJobFactory].getCanonicalName,
+      JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
+      (SystemConfig.SYSTEM_FACTORY format "test") -> 
classOf[MockSystemFactory].getCanonicalName))
+
+    new RangeSystemStreamPartitionMatcher().filter(sspSet, config)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
new file mode 100644
index 0000000..aa56f0b
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system
+
+import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
+import org.apache.samza.config._
+import org.apache.samza.coordinator.MockSystemFactory
+import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.{Partition, SamzaException}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+class TestRegexSystemStreamPartitionMatcher {
+  val sspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new 
Partition(0)))
+  sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(1)))
+  sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2)))
+
+  @Test
+  def testFilterWithMatcherConfigRegex() {
+    val config = getConfig("[1-2]")
+
+    val expectedSspSet = mutable.Set(new SystemStreamPartition("test", 
"stream1", new Partition(1)))
+    expectedSspSet.add(new SystemStreamPartition("test", "stream1", new 
Partition(2)))
+
+    val filteredSet = new 
RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
+    assertEquals(2, filteredSet.size)
+    assertEquals(expectedSspSet.asJava, filteredSet)
+  }
+
+  private def getConfig(regex: String): MapConfig = {
+    new MapConfig(mutable.Map(
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      JobConfig.STREAM_JOB_FACTORY_CLASS -> 
classOf[ThreadJobFactory].getCanonicalName,
+      JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
+      JobConfig.SSP_MATCHER_CONFIG_REGEX -> regex,
+      (SystemConfig.SYSTEM_FACTORY format "test") -> 
classOf[MockSystemFactory].getCanonicalName))
+  }
+
+  @Test
+  def testFilterWithMatcherConfigRegexWithNomatches() {
+    val config = getConfig("--")
+
+    val filteredSet = new RegexSystemStreamPartitionMatcher().filter(sspSet, 
config)
+    assertEquals(0, filteredSet.size)
+  }
+
+  @Test(expected = classOf[SamzaException])
+  def testFilterWithNoMatcherConfigRegex() {
+    val config = new MapConfig(mutable.Map(
+      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      JobConfig.STREAM_JOB_FACTORY_CLASS -> 
classOf[ThreadJobFactory].getCanonicalName,
+      JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CONFIG_REGEX,
+      (SystemConfig.SYSTEM_FACTORY format "test") -> 
classOf[MockSystemFactory].getCanonicalName))
+
+    new RegexSystemStreamPartitionMatcher().filter(sspSet, config)
+  }
+}
\ No newline at end of file

Reply via email to