Repository: samza Updated Branches: refs/heads/master 1b3b36b6e -> de52eea07
SAMZA-41: allow static partition assignment to Samza jobs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de52eea0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de52eea0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de52eea0 Branch: refs/heads/master Commit: de52eea07cd5ebb26cdfcf2afe9975dd9ab30623 Parents: 1b3b36b Author: Jose <[email protected]> Authored: Tue Jun 21 15:14:06 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Jun 21 15:14:06 2016 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 81 ++++++++++++++ .../system/SystemStreamPartitionMatcher.java | 28 +++++ .../org/apache/samza/config/JobConfig.scala | 25 +++++ .../samza/coordinator/JobCoordinator.scala | 28 ++++- .../RangeSystemStreamPartitionMatcher.scala | 55 ++++++++++ .../RegexSystemStreamPartitionMatcher.scala | 35 ++++++ .../samza/coordinator/TestJobCoordinator.scala | 94 +++++++++++++++- .../TestRangeSystemStreamPartitionMatcher.scala | 109 +++++++++++++++++++ .../TestRegexSystemStreamPartitionMatcher.scala | 79 ++++++++++++++ 9 files changed, 531 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index c1c822e..ea8d5e9 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -252,6 +252,87 @@ </dl> </td> </tr> + + <tr> + <td class="property" id="job-systemstreampartition-matcher-class">job.systemstreampartition.matcher.class</td> + <td class="default"></td> + <td class="description"> + If you want to enable static partition assignment, then this is a <strong>required</strong> configuration. + The value of this property is a fully-qualified Java class name that implements the interface + <code>org.apache.samza.system.SystemStreamPartitionMatcher</code>. + Samza ships with two matcher classes: + <dl> + <dt><code>org.apache.samza.system.RangeSystemStreamPartitionMatcher</code></dt> + <dd>This classes uses a comma separated list of range(s) to determine which partition matches, + and thus statically assigned to the Job. For example "2,3,1-2", statically assigns partition + 1, 2, and 3 for all the specified system and streams (topics in case of Kafka) to the job. + For config validation each element in the comma separated list much conform to one of the + following regex: + <ul> + <li><code>"(\\d+)"</code> or </li> + <li><code>"(\\d+-\\d+)"</code> </li> + </ul> + <code>JobConfig.SSP_MATCHER_CLASS_RANGE</code> constant has the canonical name of this class. + </dd> + </dl> + <dl> + <dt><code>org.apache.samza.system.RegexSystemStreamPartitionMatcher</code></dt> + <dd>This classes uses a standard Java supported regex to determine which partition matches, + and thus statically assigned to the Job. For example "[1-2]", statically assigns partition 1 and 2 + for all the specified system and streams (topics in case of Kafka) to the job. + <code>JobConfig.SSP_MATCHER_CLASS_REGEX</code> constant has the canonical name of this class.</dd> + </dl> + </td> + </tr> + + <tr> + <td class="property" id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.matcher.config.range</td> + <td class="default"></td> + <td class="description"> + If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is + <code>org.apache.samza.system.RangeSystemStreamPartitionMatcher</code>, then this property is a + <strong>required</strong> configuration. Specify a comma separated list of range(s) to determine which + partition matches, and thus statically assigned to the Job. For example "2,3,11-20", statically assigns + partition 2, 3, and 11 to 20 for all the specified system and streams (topics in case of Kafka) to the job. + A singel configuration value like "19" is valid as well. This statically assigns partition 19. + For config validation each element in the comma separated list much conform to one of the + following regex: + <ul> + <li><code>"(\\d+)"</code> or </li> + <li><code>"(\\d+-\\d+)"</code> </li> + </ul> + </td> + </tr> + + <tr> + <td class="property" id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.matcher.config.regex</td> + <td class="default"></td> + <td class="description"> + If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is + <code>org.apache.samza.system.RegexSystemStreamPartitionMatcher</code>, then this property is a + <strong>required</strong> configuration. The value should be a valid Java supported regex. For example "[1-2]", + statically assigns partition 1 and 2 for all the specified system and streams (topics in case of Kakfa) to the job. + </td> + </tr> + + <tr> + <td class="property" id="job_systemstreampartition_matcher_co + nfig_job_factory_regex">job.systemstreampartition.matcher.config.job.factory.regex</td> + <td class="default"></td> + <td class="description"> + This configuration can be used to specify the Java supported regex to match the <code>StreamJobFactory</code> + for which the static partition assignment should be enabled. This configuration enables the partition + assignment feature to be used for custom <code>StreamJobFactory</code>(ies) as well. + <p> + This config defaults to the following value: + <code>"org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)"</code>, + which enables static partition assignment when <code>job.factory.class</code> is set to + <code>org.apache.samza.job.local.ProcessJobFactory</code> or <code>org.apache.samza.job.local.ThreadJobFactory.</code> + </p> + </td> + </tr> + + <tr> <td class="property" id="job-checkpoint-validation-enabled">job.checkpoint.<br>validation.enabled</td> <td class="default">true</td> http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java new file mode 100644 index 0000000..bea2187 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionMatcher.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system; + +import org.apache.samza.config.Config; + +import java.util.Set; + +public interface SystemStreamPartitionMatcher { + Set<SystemStreamPartition> filter(Set<SystemStreamPartition> systemStreamPartitions, Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 23a51b2..49b08f6 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -23,6 +23,7 @@ package org.apache.samza.config import java.io.File import org.apache.samza.container.grouper.stream.GroupByPartitionFactory +import org.apache.samza.system.{RegexSystemStreamPartitionMatcher, SystemStreamPartitionMatcher} import org.apache.samza.util.Logging object JobConfig { @@ -46,6 +47,21 @@ object JobConfig { val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor" val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes" val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory" + + val SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class"; + + val SSP_MATCHER_CLASS_REGEX = "org.apache.samza.system.RegexSystemStreamPartitionMatcher" + + val SSP_MATCHER_CLASS_RANGE = "org.apache.samza.system.RangeSystemStreamPartitionMatcher" + + val SSP_MATCHER_CONFIG_REGEX = "job.systemstreampartition.matcher.config.regex"; + + val SSP_MATCHER_CONFIG_RANGES = "job.systemstreampartition.matcher.config.ranges"; + + val SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = "job.systemstreampartition.matcher.config.job.factory.regex"; + + val DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = "org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)"; + // number of partitions in the checkpoint stream should be 1. But sometimes, // if a stream was created(automatically) with the wrong number of partitions(default number of partitions // for new streams), there is no easy fix for the user (topic deletion or reducing of number of partitions @@ -142,4 +158,13 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { case _ => "26214400" } } + + def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS) + + def getSSPMatcherConfigRegex = getExcept(JobConfig.SSP_MATCHER_CONFIG_REGEX) + + def getSSPMatcherConfigRanges = getExcept(JobConfig.SSP_MATCHER_CONFIG_RANGES) + + def getSSPMatcherConfigJobFactoryRegex = getOrElse(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX) + } http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index e9a5108..0aee4ce 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -35,11 +35,13 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.job.model.{JobModel, TaskModel} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.storage.ChangelogPartitionManager -import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition} +import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition, SystemStreamPartitionMatcher} import org.apache.samza.util.{Logging, Util} import org.apache.samza.{Partition, SamzaException} import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + /** * Helper companion object that is responsible for wiring up a JobCoordinator @@ -145,6 +147,28 @@ object JobModelManager extends Logging { }.toSet } + def getMatchedInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache) : Set[SystemStreamPartition] = { + val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache) + config.getSSPMatcherClass match { + case Some(s) => { + val jfr = config.getSSPMatcherConfigJobFactoryRegex.r + config.getStreamJobFactoryClass match { + case Some(jfr(_*)) => { + info("before match: allSystemStreamPartitions.size = %s" format (allSystemStreamPartitions.size)) + val sspMatcher = Util.getObj[SystemStreamPartitionMatcher](s) + val matchedPartitions = sspMatcher.filter(allSystemStreamPartitions, config).asScala.toSet + // Usually a small set hence ok to log at info level + info("after match: matchedPartitions = %s" format (matchedPartitions)) + matchedPartitions + } + case _ => allSystemStreamPartitions + } + } + case _ => allSystemStreamPartitions + } + } + + /** * Gets a SystemStreamPartitionGrouper object from the configuration. */ @@ -163,7 +187,7 @@ object JobModelManager extends Logging { localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache): JobModel = { // Do grouping to fetch TaskName to SSP mapping - val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache) + val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache) val grouper = getSystemStreamPartitionGrouper(config) val groups = grouper.group(allSystemStreamPartitions) info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet())) http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala b/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala new file mode 100644 index 0000000..85bb271 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/system/RangeSystemStreamPartitionMatcher.scala @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.system + +import java.util + +import org.apache.samza.SamzaException +import org.apache.samza.config.{JobConfig, Config} +import org.apache.samza.config.JobConfig.Config2Job +import scala.collection.JavaConverters._ + +class RangeSystemStreamPartitionMatcher extends SystemStreamPartitionMatcher { + + override def filter(systemStreamPartitions: util.Set[SystemStreamPartition], config: Config): util.Set[SystemStreamPartition] = { + val sspRanges = config.getSSPMatcherConfigRanges + val ranges: Array[String] = sspRanges.split(",") + val rangeMap = collection.mutable.Map[Int, Int]() + + // Accept single or multiple partition ranges in one config - 2,3,7-9,1 or 19 + // Overlapping ranges are fine + val rxS = "(\\d+)".r // single digits + val rxR = "(\\d+-\\d+)".r // range like 7-9 + + ranges.foreach({ + case rxS(s) => rangeMap.put(s.toInt, s.toInt) + case rxR(r) => + val s = r.split("-") + (s(0).toInt to s(1).toInt).foreach(k => rangeMap.put(k, k)) + case _ => + val error = "Invalid partition range configuration '%s': %s" + .format(JobConfig.SSP_MATCHER_CONFIG_RANGES, sspRanges) + throw new SamzaException(error) + }) + val sspSetScala = systemStreamPartitions.asScala + sspSetScala.filter(s => rangeMap.contains(s.partition.getPartitionId)).asJava + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala b/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala new file mode 100644 index 0000000..0aaa432 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/system/RegexSystemStreamPartitionMatcher.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system + +import java.util + +import org.apache.samza.config.Config +import org.apache.samza.config.JobConfig.Config2Job +import scala.collection.JavaConverters._ + + +class RegexSystemStreamPartitionMatcher extends SystemStreamPartitionMatcher { + override def filter(systemStreamPartitions: util.Set[SystemStreamPartition], config: Config): util.Set[SystemStreamPartition] = { + val sspRegex = config.getSSPMatcherConfigRegex + val sspSetScala = systemStreamPartitions.asScala + sspSetScala.filter(s => s.partition.getPartitionId.toString.matches(sspRegex)).asJava + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 55a879b..fcabc69 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -21,6 +21,9 @@ package org.apache.samza.coordinator import java.util +import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory +import org.apache.samza.job.MockJobFactory +import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory} import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.util.Util import org.junit.{After, Test} @@ -34,7 +37,7 @@ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.Config import org.apache.samza.system._ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.Partition +import org.apache.samza.{SamzaException, Partition} import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.TaskModel @@ -179,6 +182,95 @@ class TestJobCoordinator { coordinator.stop } + /** + * Builds a coordinator from config, and then compares it with what was expected. + * We initialize with 3 partitions. We use the provided SSP_MATCHER_CLASS and + * specify a regex to only allow partitions 1-2 to be assigned to the job. + * We expect that the JobModel will only have two SystemStreamPartitions. + * Previous test tested without any matcher class. This test is with ThreadJobFactory. + */ + @Test + def testWithPartitionAssignmentWithThreadJobFactory { + val config = getTestConfig(classOf[ThreadJobFactory]) + val coordinator = JobModelManager(config) + + // Construct the expected JobModel, so we can compare it to + // JobCoordinator's JobModel. + val task1Name = new TaskName("Partition 1") + val task2Name = new TaskName("Partition 2") + val container0Tasks = Map( + task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(0))) + val container1Tasks = Map( + task2Name -> new TaskModel(task2Name, Set(new SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5))) + val containers = Map( + Integer.valueOf(0) -> new ContainerModel(0, container0Tasks)) + val jobModel = new JobModel(config, containers) + assertEquals(config, coordinator.jobModel.getConfig) + assertEquals(jobModel, coordinator.jobModel) + } + + /** + * Tests with ProcessJobFactory. + */ + @Test + def testWithPartitionAssignmentWithProcessJobFactory { + val config = getTestConfig(classOf[ProcessJobFactory]) + val coordinator = JobModelManager(config) + + // Construct the expected JobModel, so we can compare it to + // JobCoordinator's JobModel. + val task1Name = new TaskName("Partition 1") + + val container0Tasks = Map( + task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(0))) + + val containers = Map( + Integer.valueOf(0) -> new ContainerModel(0, container0Tasks)) + val jobModel = new JobModel(config, containers) + assertEquals(config, coordinator.jobModel.getConfig) + assertEquals(jobModel, coordinator.jobModel) + } + + /** + * Test with a JobFactory other than ProcessJobFactory or ThreadJobFactory so that + * JobConfing.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX does not match. + */ + @Test + def testWithPartitionAssignmentWithMockJobFactory { + val config = new SystemConfig(getTestConfig(classOf[MockJobFactory])) + + val systemNames = Set("test") + + // Map the name of each system to the corresponding SystemAdmin + val systemAdmins = systemNames.map(systemName => { + val systemFactoryClassName = config + .getSystemFactory(systemName) + .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) + val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) + systemName -> systemFactory.getAdmin(systemName, config) + }).toMap + + val streamMetadataCache = new StreamMetadataCache(systemAdmins) + + val allSSP = JobModelManager.getInputStreamPartitions(config, streamMetadataCache) + val matchedSSP = JobModelManager.getMatchedInputStreamPartitions(config, streamMetadataCache) + assertEquals(matchedSSP, allSSP) + } + + def getTestConfig(clazz : Class[_]) = { + val config = new MapConfig(Map( + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName, + TaskConfig.INPUT_STREAMS -> "test.stream1", + JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", + JobConfig.JOB_NAME -> "test", + JobConfig.STREAM_JOB_FACTORY_CLASS -> clazz.getCanonicalName, + JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX, + JobConfig.SSP_MATCHER_CONFIG_REGEX -> "[1]", + SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName, + SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)) + config + } + def extractChangelogPartitionMapping(url : String) = { val jobModel = SamzaContainer.readJobModel(url.toString) val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values()) http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala new file mode 100644 index 0000000..1f81589 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system + +import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory +import org.apache.samza.config._ +import org.apache.samza.coordinator.MockSystemFactory +import org.apache.samza.job.local.ThreadJobFactory +import org.apache.samza.{Partition, SamzaException} +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class TestRangeSystemStreamPartitionMatcher { + val sspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new Partition(0))) + sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(1))) + sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2))) + + @Test + def testFilterWithMatcherConfigSingleRange() { + val config = getConfig("1,2") + + val expectedSspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new Partition(1))) + expectedSspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2))) + + val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config) + assertEquals(2, filteredSet.size) + assertEquals(expectedSspSet.asJava, filteredSet) + } + + private def getConfig(range: String): MapConfig = { + new MapConfig(mutable.Map( + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName, + TaskConfig.INPUT_STREAMS -> "test.stream1", + JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, + JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE, + JobConfig.SSP_MATCHER_CONFIG_RANGES -> range, + (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName)) + } + + @Test + def testFilterWithMatcherConfigRanges() { + val config = getConfig("0-1,1,2") + + val expectedSspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new Partition(0))) + expectedSspSet.add(new SystemStreamPartition("test", "stream1", new Partition(1))) + expectedSspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2))) + + val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config) + assertEquals(3, filteredSet.size) + assertEquals(expectedSspSet.asJava, filteredSet) + } + + @Test(expected = classOf[SamzaException]) + def testFilterWithInvalidMatcherConfigRange() { + val config = getConfig("--") + + val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, config) + assertEquals(0, filteredSet.size) + } + + @Test + def testFilterWithMatcherConfigRangeWithNomatches() { + val config = getConfig("4-5") + + val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, config) + assertEquals(0, filteredSet.size) + } + + @Test(expected = classOf[SamzaException]) + def testFilterWithEmptyMatcherConfigRange() { + val config = getConfig("") + + val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, config) + assertEquals(0, filteredSet.size) + } + + @Test(expected = classOf[SamzaException]) + def testFilterWithNoMatcherConfigRange() { + val config = new MapConfig(mutable.Map( + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName, + TaskConfig.INPUT_STREAMS -> "test.stream1", + JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, + JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE, + (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName)) + + new RangeSystemStreamPartitionMatcher().filter(sspSet, config) + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/de52eea0/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala new file mode 100644 index 0000000..aa56f0b --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system + +import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory +import org.apache.samza.config._ +import org.apache.samza.coordinator.MockSystemFactory +import org.apache.samza.job.local.ThreadJobFactory +import org.apache.samza.{Partition, SamzaException} +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + +class TestRegexSystemStreamPartitionMatcher { + val sspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new Partition(0))) + sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(1))) + sspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2))) + + @Test + def testFilterWithMatcherConfigRegex() { + val config = getConfig("[1-2]") + + val expectedSspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new Partition(1))) + expectedSspSet.add(new SystemStreamPartition("test", "stream1", new Partition(2))) + + val filteredSet = new RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config) + assertEquals(2, filteredSet.size) + assertEquals(expectedSspSet.asJava, filteredSet) + } + + private def getConfig(regex: String): MapConfig = { + new MapConfig(mutable.Map( + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName, + TaskConfig.INPUT_STREAMS -> "test.stream1", + JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, + JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX, + JobConfig.SSP_MATCHER_CONFIG_REGEX -> regex, + (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName)) + } + + @Test + def testFilterWithMatcherConfigRegexWithNomatches() { + val config = getConfig("--") + + val filteredSet = new RegexSystemStreamPartitionMatcher().filter(sspSet, config) + assertEquals(0, filteredSet.size) + } + + @Test(expected = classOf[SamzaException]) + def testFilterWithNoMatcherConfigRegex() { + val config = new MapConfig(mutable.Map( + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName, + TaskConfig.INPUT_STREAMS -> "test.stream1", + JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, + JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CONFIG_REGEX, + (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName)) + + new RegexSystemStreamPartitionMatcher().filter(sspSet, config) + } +} \ No newline at end of file
