Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 ea37b7463 -> 03bacdc60
SAMZA-1086; New Grouper for ZK based standalone. SAMZA-1086. Create new grouper with support for arbitrary container ids. Add support for this list of container IDs in the JobModelManager. Author: Boris Shkolnik <[email protected]> Reviewers: Xinyu Liu <[email protected]>, Fred Ji <[email protected]> Closes #52 from sborya/JobModel Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/daaad7b8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/daaad7b8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/daaad7b8 Branch: refs/heads/samza-fluent-api-v1 Commit: daaad7b84837a5fc7f995234248d3349a7f6f1d2 Parents: c249443 Author: Boris Shkolnik <[email protected]> Authored: Thu Feb 16 11:40:12 2017 -0800 Committer: navina <[email protected]> Committed: Thu Feb 16 11:40:12 2017 -0800 ---------------------------------------------------------------------- .../grouper/task/GroupByContainerIds.java | 97 +++++++++ .../task/GroupByContainerIdsFactory.java | 34 +++ .../container/grouper/task/TaskNameGrouper.java | 5 + .../standalone/StandaloneJobCoordinator.java | 2 +- .../samza/coordinator/JobModelManager.scala | 18 +- .../grouper/task/TestGroupByContainerIds.java | 207 +++++++++++++++++++ 6 files changed, 355 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/daaad7b8/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java new file mode 100644 index 0000000..6d3f673 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.task; + +import java.util.Arrays; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +/** + * Simple grouper. + * It exposes two group methods - one that assumes sequential container numbers and one that gets a set of container + * IDs as an argument. Please note - this first implementation ignores locality information. + */ +public class GroupByContainerIds implements TaskNameGrouper { + private final int startContainerCount; + public GroupByContainerIds(int count) { + this.startContainerCount = count; + } + + @Override + public Set<ContainerModel> group(Set<TaskModel> tasks) { + if (tasks.isEmpty()) + throw new IllegalArgumentException("cannot group an empty set"); + + if (startContainerCount > tasks.size()) + throw new IllegalArgumentException("number of containers=" + startContainerCount + " is bigger than number of tasks=" + tasks.size()); + + List<Integer> containerIds = new ArrayList<>(startContainerCount); + for (int i = 0; i < startContainerCount; i++) { + containerIds.add(i); + } + return group(tasks, containerIds); + } + + public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) { + if (tasks.isEmpty()) + throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays + .toString(containersIds.toArray())); + + if (containersIds.size() > tasks.size()) + throw new IllegalArgumentException("number of containers " + containersIds.size() + " is bigger than number of tasks " + tasks.size()); + + if (containersIds == null) + return this.group(tasks); + + int containerCount = containersIds.size(); + + // Sort tasks by taskName. + List<TaskModel> sortedTasks = new ArrayList<>(tasks); + Collections.sort(sortedTasks); + + // Map every task to a container in round-robin fashion. + Map<TaskName, TaskModel>[] taskGroups = new Map[containerCount]; + for (int i = 0; i < containerCount; i++) { + taskGroups[i] = new HashMap<>(); + } + for (int i = 0; i < sortedTasks.size(); i++) { + TaskModel tm = sortedTasks.get(i); + taskGroups[i % containerCount].put(tm.getTaskName(), tm); + } + + // Convert to a Set of ContainerModel + Set<ContainerModel> containerModels = new HashSet<>(); + for (int i = 0; i < containerCount; i++) { + containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i])); + } + + return Collections.unmodifiableSet(containerModels); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/daaad7b8/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java new file mode 100644 index 0000000..0383d00 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIdsFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task; + + +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; + + +/** + * Factory to build the GroupByContainerCount class. + */ +public class GroupByContainerIdsFactory implements TaskNameGrouperFactory { + @Override + public TaskNameGrouper build(Config config) { + return new GroupByContainerIds(new JobConfig(config).getContainerCount()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/daaad7b8/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java index 59a3237..d06bf62 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java @@ -18,6 +18,7 @@ */ package org.apache.samza.container.grouper.task; +import java.util.List; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; @@ -50,4 +51,8 @@ public interface TaskNameGrouper { * @return Set of containers, which contain the tasks that were passed in. */ Set<ContainerModel> group(Set<TaskModel> tasks); + + default Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) { + return group(tasks); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/daaad7b8/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java index 1401725..46dbf30 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java @@ -105,7 +105,7 @@ public class StandaloneJobCoordinator implements JobCoordinator { * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) */ - this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, null); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/daaad7b8/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 7f5d05d..14d5dff 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -106,7 +106,7 @@ object JobModelManager extends Logging { } } - val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor) + val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor, null) createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions) jobCoordinator @@ -121,8 +121,9 @@ object JobModelManager extends Logging { changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache, - streamPartitionCountMonitor: StreamPartitionCountMonitor) = { - val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) + streamPartitionCountMonitor: StreamPartitionCountMonitor, + containerIds: java.util.List[Integer]) = { + val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache, containerIds) jobModelRef.set(jobModel) val server = new HttpServer @@ -188,7 +189,8 @@ object JobModelManager extends Logging { private def initializeJobModel(config: Config, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, - streamMetadataCache: StreamMetadataCache): JobModel = { + streamMetadataCache: StreamMetadataCache, + containerIds: java.util.List[Integer]): JobModel = { // Do grouping to fetch TaskName to SSP mapping val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache) val grouper = getSystemStreamPartitionGrouper(config) @@ -216,7 +218,8 @@ object JobModelManager extends Logging { def jobModelGenerator(): JobModel = refreshJobModel(config, groups, previousChangelogMapping, - localityManager) + localityManager, + containerIds) val jobModel = jobModelGenerator() @@ -248,7 +251,8 @@ object JobModelManager extends Logging { private def refreshJobModel(config: Config, groups: util.Map[TaskName, util.Set[SystemStreamPartition]], previousChangelogMapping: util.Map[TaskName, Integer], - localityManager: LocalityManager): JobModel = { + localityManager: LocalityManager, + containerIds: java.util.List[Integer]): JobModel = { // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change // mapping. @@ -280,7 +284,7 @@ object JobModelManager extends Logging { if (containerGrouper.isInstanceOf[BalancingTaskNameGrouper]) containerGrouper.asInstanceOf[BalancingTaskNameGrouper].balance(taskModels, localityManager) else - containerGrouper.group(taskModels) + containerGrouper.group(taskModels, containerIds) } val containerMap = asScalaSet(containerModels).map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap http://git-wip-us.apache.org/repos/asf/samza/blob/daaad7b8/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java new file mode 100644 index 0000000..82f2b7a --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.task; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels; +import static org.apache.samza.container.mock.ContainerMocks.getTaskModel; +import static org.apache.samza.container.mock.ContainerMocks.getTaskName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestGroupByContainerIds { + private TaskAssignmentManager taskAssignmentManager; + private LocalityManager localityManager; + + @Before + public void setup() { + taskAssignmentManager = mock(TaskAssignmentManager.class); + localityManager = mock(LocalityManager.class); + when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager); + + + } + + private Config buildConfigForContainerCount(int count) { + Map<String, String> map = new HashMap<>(); + map.put("job.container.count", String.valueOf(count)); + return new MapConfig(map); + } + + private TaskNameGrouper buildSimpleGrouper() { + return buildSimpleGrouper(1); + } + private TaskNameGrouper buildSimpleGrouper(int containerCount) { + return new GroupByContainerIdsFactory().build(buildConfigForContainerCount(containerCount)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupEmptyTasks() { + buildSimpleGrouper(1).group(new HashSet()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupFewerTasksThanContainers() { + Set<TaskModel> taskModels = new HashSet<>(); + taskModels.add(getTaskModel(1)); + buildSimpleGrouper(2).group(taskModels); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGrouperResultImmutable() { + Set<TaskModel> taskModels = generateTaskModels(3); + Set<ContainerModel> containers = buildSimpleGrouper(2).group(taskModels); + containers.remove(containers.iterator().next()); + } + + @Test + public void testGroupHappyPath() { + Set<TaskModel> taskModels = generateTaskModels(5); + + Set<ContainerModel> containers = buildSimpleGrouper(2).group(taskModels); + + Map<Integer, ContainerModel> containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getContainerId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get(0); + ContainerModel container1 = containersMap.get(1); + assertNotNull(container0); + assertNotNull(container1); + assertEquals(0, container0.getContainerId()); + assertEquals(1, container1.getContainerId()); + assertEquals(3, container0.getTasks().size()); + assertEquals(2, container1.getTasks().size()); + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(3))); + } + + @Test + public void testGroupHappyPathWithListOfContainers() { + Set<TaskModel> taskModels = generateTaskModels(5); + + List<Integer> containerIds = new ArrayList<Integer>() { + { + add(4); + add(2); + } + }; + + Set<ContainerModel> containers = buildSimpleGrouper().group(taskModels, containerIds); + + Map<Integer, ContainerModel> containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getContainerId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get(4); + ContainerModel container1 = containersMap.get(2); + assertNotNull(container0); + assertNotNull(container1); + assertEquals(4, container0.getContainerId()); + assertEquals(2, container1.getContainerId()); + assertEquals(3, container0.getTasks().size()); + assertEquals(2, container1.getTasks().size()); + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(3))); + } + + + @Test + public void testGroupManyTasks() { + Set<TaskModel> taskModels = generateTaskModels(21); + + List<Integer> containerIds = new ArrayList<Integer>() { + { + add(4); + add(2); + } + }; + + + Set<ContainerModel> containers = buildSimpleGrouper().group(taskModels, containerIds); + + Map<Integer, ContainerModel> containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getContainerId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get(4); + ContainerModel container1 = containersMap.get(2); + assertNotNull(container0); + assertNotNull(container1); + assertEquals(4, container0.getContainerId()); + assertEquals(2, container1.getContainerId()); + assertEquals(11, container0.getTasks().size()); + assertEquals(10, container1.getTasks().size()); + + // NOTE: tasks are sorted lexicographically, so the container assignment + // can seem odd, but the consistency is the key focus + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(10))); + assertTrue(container0.getTasks().containsKey(getTaskName(12))); + assertTrue(container0.getTasks().containsKey(getTaskName(14))); + assertTrue(container0.getTasks().containsKey(getTaskName(16))); + assertTrue(container0.getTasks().containsKey(getTaskName(18))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(3))); + assertTrue(container0.getTasks().containsKey(getTaskName(5))); + assertTrue(container0.getTasks().containsKey(getTaskName(7))); + assertTrue(container0.getTasks().containsKey(getTaskName(9))); + + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(11))); + assertTrue(container1.getTasks().containsKey(getTaskName(13))); + assertTrue(container1.getTasks().containsKey(getTaskName(15))); + assertTrue(container1.getTasks().containsKey(getTaskName(17))); + assertTrue(container1.getTasks().containsKey(getTaskName(19))); + assertTrue(container1.getTasks().containsKey(getTaskName(20))); + assertTrue(container1.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(6))); + assertTrue(container1.getTasks().containsKey(getTaskName(8))); + } +}
