added SimpleGroupByContainer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8cbf0140 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8cbf0140 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8cbf0140 Branch: refs/heads/samza-standalone Commit: 8cbf01407844eab187ea39a222ee7a229b091422 Parents: f15b907 Author: navina <[email protected]> Authored: Fri Dec 23 17:01:04 2016 -0800 Committer: navina <[email protected]> Committed: Fri Dec 23 17:01:04 2016 -0800 ---------------------------------------------------------------------- .../task/SimpleGroupByContainerCount.java | 56 ++++++++++++++++++++ .../SimpleGroupByContainerCountFactory.java | 34 ++++++++++++ 2 files changed, 90 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8cbf0140/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java new file mode 100644 index 0000000..c9489f7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -0,0 +1,56 @@ +package org.apache.samza.container.grouper.task; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; + + +public class SimpleGroupByContainerCount implements BalancingTaskNameGrouper { + private final int containerCount; + public SimpleGroupByContainerCount() { + this.containerCount = 1; + } + public SimpleGroupByContainerCount(int containerCount) { + if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container"); + this.containerCount = containerCount; + } + + @Override + public Set<ContainerModel> group(Set<TaskModel> tasks) { + + // Sort tasks by taskName. + List<TaskModel> sortedTasks = new ArrayList<>(tasks); + Collections.sort(sortedTasks); + + // Map every task to a container in round-robin fashion. + Map<TaskName, TaskModel>[] taskGroups = new Map[containerCount]; + for (int i = 0; i < containerCount; i++) { + taskGroups[i] = new HashMap<>(); + } + for (int i = 0; i < sortedTasks.size(); i++) { + TaskModel tm = sortedTasks.get(i); + taskGroups[i % containerCount].put(tm.getTaskName(), tm); + } + + // Convert to a Set of ContainerModel + Set<ContainerModel> containerModels = new HashSet<>(); + for (int i = 0; i < containerCount; i++) { + containerModels.add(new ContainerModel(i, taskGroups[i])); + } + + return Collections.unmodifiableSet(containerModels); + } + + @Override + public Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8cbf0140/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java new file mode 100644 index 0000000..02918f6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task; + + +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; + + +/** + * Factory to build the GroupByContainerCount class. + */ +public class SimpleGroupByContainerCountFactory implements TaskNameGrouperFactory { + @Override + public TaskNameGrouper build(Config config) { + return new SimpleGroupByContainerCount(); + } +} \ No newline at end of file
