Repository: incubator-samza Updated Branches: refs/heads/master 3a8e2f9d1 -> cc11e02dd
SAMZA-359; refactor grouper package names Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2c1391f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2c1391f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2c1391f6 Branch: refs/heads/master Commit: 2c1391f6721c90e0a004ee25f746e2d627ccf2a0 Parents: 3a8e2f9 Author: Chris Riccomini <[email protected]> Authored: Wed Jul 30 15:58:40 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Jul 30 15:58:40 2014 -0700 ---------------------------------------------------------------------- .../container/SystemStreamPartitionGrouper.java | 40 -------------- .../SystemStreamPartitionGrouperFactory.java | 28 ---------- .../org/apache/samza/container/TaskName.java | 2 +- .../stream/SystemStreamPartitionGrouper.java | 41 ++++++++++++++ .../SystemStreamPartitionGrouperFactory.java | 28 ++++++++++ .../org/apache/samza/config/JobConfig.scala | 2 +- .../SystemStreamPartitionTaskNameGrouper.scala | 38 ------------- .../grouper/stream/GroupByPartition.scala | 41 ++++++++++++++ .../stream/GroupBySystemStreamPartition.scala | 38 +++++++++++++ .../grouper/task/GroupByContainerCount.scala | 50 +++++++++++++++++ .../grouper/task/TaskNameGrouper.scala | 40 ++++++++++++++ .../groupers/GroupByPartition.scala | 41 -------------- .../groupers/GroupBySystemStreamPartition.scala | 38 ------------- ...leSystemStreamPartitionTaskNameGrouper.scala | 50 ----------------- .../main/scala/org/apache/samza/util/Util.scala | 9 ++-- .../SystemStreamPartitionGrouperTestBase.scala | 57 -------------------- .../grouper/stream/GroupByTestBase.scala | 57 ++++++++++++++++++++ .../grouper/stream/TestGroupByPartition.scala | 37 +++++++++++++ .../TestGroupBySystemStreamPartition.scala | 41 ++++++++++++++ .../task/TestGroupByContainerCount.scala | 54 +++++++++++++++++++ .../groupers/TestGroupByPartition.scala | 37 ------------- .../TestGroupBySystemStreamPartition.scala | 41 -------------- ...leSystemStreamPartitionTaskNameGrouper.scala | 54 ------------------- .../kafka/TestKafkaCheckpointManager.scala | 2 +- 24 files changed, 435 insertions(+), 431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java deleted file mode 100644 index 897d9f5..0000000 --- a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container; - -import org.apache.samza.system.SystemStreamPartition; - -import java.util.Map; -import java.util.Set; - -/** - * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined - * by the implementation. Each taskName has a key that uniquely describes what sets may be in it, but does - * not generally enumerate the elements of those sets. For example, a SystemStreamPartitionGrouper that - * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating - * four TaskNames: 0, 1, 2, 3. These TaskNames describe the partitions but do not list all of the - * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing - * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than - * four partitions. On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition - * to be its own, unique group would use the SystemStreamPartition's entire description to generate - * the TaskNames. - */ -public interface SystemStreamPartitionGrouper { - public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps); -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java deleted file mode 100644 index 10ac6e2..0000000 --- a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container; - -import org.apache.samza.config.Config; - -/** - * Return an instance a SystemStreamPartitionGrouper per the particular implementation - */ -public interface SystemStreamPartitionGrouperFactory { - public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config); -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/TaskName.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java index 13a1206..0833586 100644 --- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java +++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java @@ -20,7 +20,7 @@ package org.apache.samza.container; /** * A unique identifier of a set of a SystemStreamPartitions that have been grouped by - * a {@link org.apache.samza.container.SystemStreamPartitionGrouper}. The + * a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}. The * SystemStreamPartitionGrouper determines the TaskName for each set it creates. */ public class TaskName implements Comparable<TaskName> { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java new file mode 100644 index 0000000..f374c4b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream; + +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Map; +import java.util.Set; + +/** + * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined + * by the implementation. Each taskName has a key that uniquely describes what sets may be in it, but does + * not generally enumerate the elements of those sets. For example, a SystemStreamPartitionGrouper that + * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating + * four TaskNames: 0, 1, 2, 3. These TaskNames describe the partitions but do not list all of the + * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing + * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than + * four partitions. On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition + * to be its own, unique group would use the SystemStreamPartition's entire description to generate + * the TaskNames. + */ +public interface SystemStreamPartitionGrouper { + public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps); +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java new file mode 100644 index 0000000..6c0b12d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream; + +import org.apache.samza.config.Config; + +/** + * Return an instance a SystemStreamPartitionGrouper per the particular implementation + */ +public interface SystemStreamPartitionGrouperFactory { + public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config); +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index f84aeea..3b6685e 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -19,7 +19,7 @@ package org.apache.samza.config -import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory +import org.apache.samza.container.grouper.stream.GroupByPartitionFactory object JobConfig { // job config constants http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala deleted file mode 100644 index a8c93ac..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container - -/** - * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of - * {@link org.apache.samza.container.SystemStreamPartitionGrouper}, we can then map those groupings onto - * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes - * those groupings-of-SSPs and groups them together on which container each should run on. A simple - * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More - * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them - * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc. - */ -trait SystemStreamPartitionTaskNameGrouper { - /** - * Group TaskNamesToSystemStreamPartitions onto the containers they will share - * - * @param taskNames Pre-grouped SSPs - * @return Mapping of container ID to set if TaskNames it will run - */ - def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala new file mode 100644 index 0000000..44e95fc --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream + +import org.apache.samza.container.TaskName +import java.util +import org.apache.samza.system.SystemStreamPartition +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ +import org.apache.samza.config.Config + +/** + * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being + * the string representation of the Partition. + */ +class GroupByPartition extends SystemStreamPartitionGrouper { + override def group(ssps: util.Set[SystemStreamPartition]) = { + ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) ) + .map(r => r._1 -> r._2.asJava) + } +} + +class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory { + override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala new file mode 100644 index 0000000..3c0acad --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream + +import org.apache.samza.container.TaskName +import java.util +import org.apache.samza.system.SystemStreamPartition +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ +import org.apache.samza.config.Config + +/** + * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each + * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition + */ +class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper { + override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava) +} + +class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory { + override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala new file mode 100644 index 0000000..7a3ba46 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task + +import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.system.SystemStreamPartition + +/** + * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames + * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be). + * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution + * of the number of taskNames between containers, etc. + */ +class GroupByContainerCount(numContainers:Int) extends TaskNameGrouper { + require(numContainers > 0, "Must have at least one container") + + override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = { + val keySize = taskNames.keySet.size + require(keySize > 0, "Must have some SSPs to group, but found none") + + // Iterate through the taskNames, round-robining them per container + val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap + var idx = 0 + for(taskName <- taskNames.iterator) { + val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above + idx = (idx + 1) % numContainers + + currMap += taskName + } + + byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap + } +} + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala new file mode 100644 index 0000000..46e75b1 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task + +import org.apache.samza.container.TaskNamesToSystemStreamPartitions + +/** + * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of + * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}, we can then map those groupings onto + * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes + * those groupings-of-SSPs and groups them together on which container each should run on. A simple + * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More + * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them + * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc. + */ +trait TaskNameGrouper { + /** + * Group TaskNamesToSystemStreamPartitions onto the containers they will share + * + * @param taskNames Pre-grouped SSPs + * @return Mapping of container ID to set if TaskNames it will run + */ + def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala deleted file mode 100644 index 223862f..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.systemstreampartition.groupers - -import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper} -import java.util -import org.apache.samza.system.SystemStreamPartition -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ -import org.apache.samza.config.Config - -/** - * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being - * the string representation of the Partition. - */ -class GroupByPartition extends SystemStreamPartitionGrouper { - override def group(ssps: util.Set[SystemStreamPartition]) = { - ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) ) - .map(r => r._1 -> r._2.asJava) - } -} - -class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory { - override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala deleted file mode 100644 index a2bcfee..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.systemstreampartition.groupers - -import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper} -import java.util -import org.apache.samza.system.SystemStreamPartition -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ -import org.apache.samza.config.Config - -/** - * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each - * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition - */ -class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper { - override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava) -} - -class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory { - override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala deleted file mode 100644 index 7913294..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.systemstreampartition.taskname.groupers - -import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions} -import org.apache.samza.system.SystemStreamPartition - -/** - * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames - * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be). - * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution - * of the number of taskNames between containers, etc. - */ -class SimpleSystemStreamPartitionTaskNameGrouper(numContainers:Int) extends SystemStreamPartitionTaskNameGrouper { - require(numContainers > 0, "Must have at least one container") - - override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = { - val keySize = taskNames.keySet.size - require(keySize > 0, "Must have some SSPs to group, but found none") - - // Iterate through the taskNames, round-robining them per container - val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap - var idx = 0 - for(taskName <- taskNames.iterator) { - val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above - idx = (idx + 1) % numContainers - - currMap += taskName - } - - byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap - } -} - http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 32c2647..16ad5a2 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -32,13 +32,14 @@ import org.apache.samza.config.Config import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory -import org.apache.samza.container.systemstreampartition.taskname.groupers.SimpleSystemStreamPartitionTaskNameGrouper -import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions, SystemStreamPartitionGrouperFactory} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream} import scala.collection.JavaConversions._ import scala.collection +import org.apache.samza.container.TaskName +import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory +import org.apache.samza.container.TaskNamesToSystemStreamPartitions +import org.apache.samza.container.grouper.task.GroupByContainerCount object Util extends Logging { val random = new Random @@ -148,7 +149,7 @@ object Util extends Logging { info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions taskNames to " + containerCount + " containers.") // Here is where we should put in a pluggable option for the SSPTaskNameGrouper for locality, load-balancing, etc. - val sspTaskNameGrouper = new SimpleSystemStreamPartitionTaskNameGrouper(containerCount) + val sspTaskNameGrouper = new GroupByContainerCount(containerCount) val containersToTaskNames = sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala deleted file mode 100644 index 3032b00..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container - -import org.apache.samza.Partition -import org.apache.samza.system.SystemStreamPartition -import org.junit.Test -import java.util.HashSet -import java.util.Map -import java.util.Set -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue -import java.util.Collections - -object SystemStreamPartitionGrouperTestBase { - val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)) - val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)) - val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)) - val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1)) - val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2)) - val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)) - val allSSPs = new HashSet[SystemStreamPartition] - Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0) -} - -abstract class SystemStreamPartitionGrouperTestBase { - def getGrouper: SystemStreamPartitionGrouper - - @Test - def emptySetReturnsEmptyMap { - val grouper: SystemStreamPartitionGrouper = getGrouper - val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition]) - assertTrue(result.isEmpty) - } - - def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) { - val grouper: SystemStreamPartitionGrouper = getGrouper - val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input) - assertEquals(output, result) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala new file mode 100644 index 0000000..47d716e --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream + +import org.apache.samza.Partition +import org.apache.samza.system.SystemStreamPartition +import org.junit.Test +import java.util.HashSet +import java.util.Map +import java.util.Set +import org.junit.Assert._ +import java.util.Collections +import org.apache.samza.container.TaskName + +object GroupByTestBase { + val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)) + val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)) + val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)) + val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1)) + val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2)) + val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)) + val allSSPs = new HashSet[SystemStreamPartition] + Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0) +} + +abstract class GroupByTestBase { + def getGrouper: SystemStreamPartitionGrouper + + @Test + def emptySetReturnsEmptyMap { + val grouper: SystemStreamPartitionGrouper = getGrouper + val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition]) + assertTrue(result.isEmpty) + } + + def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) { + val grouper: SystemStreamPartitionGrouper = getGrouper + val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input) + assertEquals(output, result) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala new file mode 100644 index 0000000..2fa718c --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream + +import org.apache.samza.container.TaskName +import scala.collection.JavaConverters._ +import org.junit.Test + +class TestGroupByPartition extends GroupByTestBase { + import GroupByTestBase._ + + val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava, + new TaskName("Partition 1") -> Set(aa1, ab1).asJava, + new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava + + override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition + + @Test def groupingWorks() { + verifyGroupGroupsCorrectly(allSSPs, expected) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala new file mode 100644 index 0000000..8da0595 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.stream + +import org.apache.samza.container.TaskName +import scala.collection.JavaConverters._ +import org.junit.Test + +class TestGroupBySystemStreamPartition extends GroupByTestBase { + import GroupByTestBase._ + + // Building manually to avoid just duplicating a logic potential logic error here and there + val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava, + new TaskName(aa1.toString) -> Set(aa1).asJava, + new TaskName(aa2.toString) -> Set(aa2).asJava, + new TaskName(ab1.toString) -> Set(ab1).asJava, + new TaskName(ab2.toString) -> Set(ab2).asJava, + new TaskName(ac0.toString) -> Set(ac0).asJava).asJava + + override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition + + @Test def groupingWorks() { + verifyGroupGroupsCorrectly(allSSPs, expected) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala new file mode 100644 index 0000000..20f41a8 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task + +import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.system.SystemStreamPartition +import org.junit.Assert._ +import org.junit.Test + +class TestGroupByContainerCount { + val emptySSPSet = Set[SystemStreamPartition]() + + @Test + def weGetAsExactlyManyGroupsAsWeAskFor() { + // memoize the maps used in the test to avoid an O(n^3) loop + val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]() + + def tntsspOfSize(size:Int) = { + def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap) + + tntsspCache.getOrElseUpdate(size, getMap(size)) + } + + val maxTNTSSPSize = 1000 + val maxNumGroups = 140 + for(numGroups <- 1 to maxNumGroups) { + val grouper = new GroupByContainerCount(numGroups) + + for (tntsspSize <- numGroups to maxTNTSSPSize) { + val map = tntsspOfSize(tntsspSize) + assertEquals(tntsspSize, map.size) + + val grouped = grouper.groupTaskNames(map) + assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala deleted file mode 100644 index 733be20..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.systemstreampartition.groupers - -import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper} -import scala.collection.JavaConverters._ -import org.junit.Test - -class TestGroupByPartition extends SystemStreamPartitionGrouperTestBase { - import SystemStreamPartitionGrouperTestBase._ - - val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava, - new TaskName("Partition 1") -> Set(aa1, ab1).asJava, - new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava - - override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition - - @Test def groupingWorks() { - verifyGroupGroupsCorrectly(allSSPs, expected) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala deleted file mode 100644 index e9c15a5..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.systemstreampartition.groupers - -import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper} -import scala.collection.JavaConverters._ -import org.junit.Test - -class TestGroupBySystemStreamPartition extends SystemStreamPartitionGrouperTestBase { - import SystemStreamPartitionGrouperTestBase._ - - // Building manually to avoid just duplicating a logic potential logic error here and there - val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava, - new TaskName(aa1.toString) -> Set(aa1).asJava, - new TaskName(aa2.toString) -> Set(aa2).asJava, - new TaskName(ab1.toString) -> Set(ab1).asJava, - new TaskName(ab2.toString) -> Set(ab2).asJava, - new TaskName(ac0.toString) -> Set(ac0).asJava).asJava - - override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition - - @Test def groupingWorks() { - verifyGroupGroupsCorrectly(allSSPs, expected) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala deleted file mode 100644 index 7ea09cd..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.systemstreampartition.taskname.groupers - -import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} -import org.apache.samza.system.SystemStreamPartition -import org.junit.Assert._ -import org.junit.Test - -class TestSimpleSystemStreamPartitionTaskNameGrouper { - val emptySSPSet = Set[SystemStreamPartition]() - - @Test - def weGetAsExactlyManyGroupsAsWeAskFor() { - // memoize the maps used in the test to avoid an O(n^3) loop - val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]() - - def tntsspOfSize(size:Int) = { - def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap) - - tntsspCache.getOrElseUpdate(size, getMap(size)) - } - - val maxTNTSSPSize = 1000 - val maxNumGroups = 140 - for(numGroups <- 1 to maxNumGroups) { - val grouper = new SimpleSystemStreamPartitionTaskNameGrouper(numGroups) - - for (tntsspSize <- numGroups to maxTNTSSPSize) { - val map = tntsspOfSize(tntsspSize) - assertEquals(tntsspSize, map.size) - - val grouped = grouper.groupTaskNames(map) - assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index cddee13..34fe6dd 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -42,7 +42,7 @@ import org.junit.Assert._ import org.junit.{AfterClass, BeforeClass, Test} import scala.collection.JavaConversions._ import scala.collection._ -import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory +import org.apache.samza.container.grouper.stream.GroupByPartitionFactory object TestKafkaCheckpointManager { val zkConnect: String = TestZKUtils.zookeeperConnect
