Updated Branches: refs/heads/master 01caadbca -> b694e7ade
SAMZA-82: Not use maximum number of partitions when initializing streams. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/b694e7ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/b694e7ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/b694e7ad Branch: refs/heads/master Commit: b694e7adecb8dc326e8b96cc51b32603ffd4d9f2 Parents: 01caadb Author: Jakob Homan <[email protected]> Authored: Thu Dec 12 16:37:04 2013 -0800 Committer: Jakob Homan <[email protected]> Committed: Thu Dec 12 16:37:04 2013 -0800 ---------------------------------------------------------------------- .../org/apache/samza/job/CommandBuilder.java | 20 ++--- .../samza/config/ShellCommandConfig.scala | 5 +- .../org/apache/samza/config/TaskConfig.scala | 2 - .../apache/samza/container/SamzaContainer.scala | 28 +++--- .../apache/samza/job/ShellCommandBuilder.scala | 7 +- .../samza/job/local/LocalJobFactory.scala | 9 +- .../main/scala/org/apache/samza/util/Util.scala | 91 ++++++++++++++----- .../scala/org/apache/samza/util/TestUtil.scala | 92 ++++++++++++++++++++ .../kafka/KafkaCheckpointManagerFactory.scala | 4 +- .../job/yarn/SamzaAppMasterTaskManager.scala | 30 ++----- .../yarn/TestSamzaAppMasterTaskManager.scala | 45 +++++----- 11 files changed, 224 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java index 934423b..5ec6433 100644 --- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java +++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java @@ -19,23 +19,22 @@ package org.apache.samza.job; +import org.apache.samza.config.Config; +import org.apache.samza.system.SystemStreamPartition; + import java.util.Map; import java.util.Set; -import org.apache.samza.Partition; -import org.apache.samza.config.Config; - public abstract class CommandBuilder { - protected Set<Partition> partitions; - protected int totalPartitions; + protected Set<SystemStreamPartition> systemStreamPartitions; protected String name; protected Config config; - public CommandBuilder setPartitions(Set<Partition> partitions) { - this.partitions = partitions; + public CommandBuilder setStreamPartitions(Set<SystemStreamPartition> ssp) { + this.systemStreamPartitions = ssp; return this; } - + /** * @param name * associated with a specific instantiation of a TaskRunner. @@ -51,11 +50,6 @@ public abstract class CommandBuilder { return this; } - public CommandBuilder setTotalPartitions(int totalPartitions) { - this.totalPartitions = totalPartitions; - return this; - } - public abstract String buildCommand(); public abstract Map<String, String> buildEnvironment(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala index b6efe46..3e4ab29 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala @@ -26,9 +26,10 @@ object ShellCommandConfig { val ENV_CONFIG = "SAMZA_CONFIG" /** - * A CSV list of partition IDs that a TaskRunner is responsible for (e.g. 0,2,4,6). + * An encoded list of the streams and partitions this container is responsible for. Encoded by + * {@link org.apache.samza.util.Util#createStreamPartitionString} */ - val ENV_PARTITION_IDS = "SAMZA_PARTITION_IDS" + val ENV_SYSTEM_STREAMS = "SAMZA_SYSTEM_STREAMS" /** * The name for a container (either a YARN AM or SamzaContainer) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 9c4370f..3510f1f 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -19,8 +19,6 @@ package org.apache.samza.config -import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.SamzaException import org.apache.samza.util.Util import org.apache.samza.system.SystemStream http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index aefec27..9baff44 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -38,7 +38,6 @@ import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.JvmMetrics import org.apache.samza.metrics.MetricsReporter import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.serializers.Serde import org.apache.samza.serializers.SerdeFactory import org.apache.samza.serializers.SerdeManager import org.apache.samza.storage.StorageEngineFactory @@ -54,7 +53,6 @@ import org.apache.samza.task.ReadableCoordinator import org.apache.samza.system.SystemProducers import org.apache.samza.task.ReadableCollector import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.chooser.MessageChooser import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.SystemProducersMetrics import org.apache.samza.system.SystemConsumersMetrics @@ -68,12 +66,11 @@ object SamzaContainer extends Logging { val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME) val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG) val config = JsonConfigSerializer.fromJson(configStr) - val partitionIdsCsv = System.getenv(ShellCommandConfig.ENV_PARTITION_IDS) - val partitions = if (partitionIdsCsv.length > 0) { - partitionIdsCsv.split(",") - .map(partitionIdStr => new Partition(partitionIdStr.toInt)) - .toSet - } else { + val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS) + + val partitions = Util.createStreamPartitionsFromString(encodedStreamsAndPartitions) + + if(partitions.isEmpty) { throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.") } @@ -84,9 +81,9 @@ object SamzaContainer extends Logging { } } - def apply(containerName: String, partitions: Set[Partition], config: Config) = { + def apply(containerName: String, inputStreams: Set[SystemStreamPartition], config: Config) = { info("Setting up Samza container: %s" format containerName) - info("Using partitions: %s" format partitions) + info("Using streams and partitions: %s" format inputStreams) info("Using configuration: %s" format config) val registry = new MetricsRegistryMap(containerName) @@ -94,11 +91,8 @@ object SamzaContainer extends Logging { val systemProducersMetrics = new SystemProducersMetrics(registry) val systemConsumersMetrics = new SystemConsumersMetrics(registry) - val inputStreams = config.getInputStreams val inputSystems = inputStreams.map(_.getSystem) - info("Got input streams: %s" format inputStreams) - val systemNames = config.getSystemNames info("Got system names: %s" format systemNames) @@ -336,6 +330,9 @@ object SamzaContainer extends Logging { info("Got commit milliseconds: %s" format taskCommitMs) // Wire up all task-level (unshared) objects. + + val partitions = inputStreams.map(_.getPartition).toSet + val taskInstances = partitions.map(partition => { debug("Setting up task instance: %s" format partition) @@ -394,6 +391,9 @@ object SamzaContainer extends Logging { changeLogSystemStreams = changeLogSystemStreams, storeBaseDir = storeBaseDir) + val inputStreamsForThisPartition = inputStreams.filter(_.getPartition.equals(partition)).map(_.getSystemStream) + info("Assigning SystemStreams " + inputStreamsForThisPartition + " to " + partition) + val taskInstance = new TaskInstance( task = task, partition = partition, @@ -405,7 +405,7 @@ object SamzaContainer extends Logging { checkpointManager = checkpointManager, reporters = reporters, listeners = listeners, - inputStreams = inputStreams, + inputStreams = inputStreamsForThisPartition, resetInputStreams = resetInputStreams, windowMs = taskWindowMs, commitMs = taskCommitMs, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala index c67e46d..b4eaf90 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala @@ -23,16 +23,17 @@ import scala.collection.JavaConversions._ import org.apache.samza.config.ShellCommandConfig import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand import org.apache.samza.config.serializers.JsonConfigSerializer +import org.apache.samza.util.Util class ShellCommandBuilder extends CommandBuilder { def buildCommand() = config.getCommand def buildEnvironment(): java.util.Map[String, String] = { - val parts = if (partitions.size() > 0) partitions.map(_.getPartitionId.toString).reduceLeft(_ + "," + _) else "" - + val streamsAndPartsString = Util.createStreamPartitionString(systemStreamPartitions.toSet) // Java to Scala set conversion + Map( ShellCommandConfig.ENV_CONTAINER_NAME -> name, - ShellCommandConfig.ENV_PARTITION_IDS -> parts, + ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString, ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config), ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse("")) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala index ddb119b..e20e7c1 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala @@ -20,14 +20,11 @@ package org.apache.samza.job.local import org.apache.samza.config.TaskConfig._ import org.apache.samza.config.Config -import org.apache.samza.config.SystemConfig._ -import org.apache.samza.config.StreamConfig._ import org.apache.samza.config.ShellCommandConfig._ import org.apache.samza.job.CommandBuilder import org.apache.samza.job.StreamJob import org.apache.samza.job.StreamJobFactory import scala.collection.JavaConversions._ -import org.apache.samza.Partition import grizzled.slf4j.Logging import org.apache.samza.SamzaException import org.apache.samza.container.SamzaContainer @@ -37,7 +34,7 @@ import org.apache.samza.job.ShellCommandBuilder class LocalJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { val taskName = "local-task" - val partitions = Util.getMaxInputStreamPartitions(config) + val partitions = Util.getInputStreamPartitions(config) info("got partitions for job %s" format partitions) @@ -50,11 +47,11 @@ class LocalJobFactory extends StreamJobFactory with Logging { // A command class was specified, so we need to use a process job to // execute the command in its own process. val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder] - + cmdBuilder .setConfig(config) .setName(taskName) - .setPartitions(partitions) + .setStreamPartitions(partitions) val processBuilder = new ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 6b2ec49..04a6dfc 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -20,18 +20,14 @@ package org.apache.samza.util import java.io.File -import java.net.InetAddress -import java.net.UnknownHostException import java.util.Random import grizzled.slf4j.Logging -import org.apache.samza.SamzaException +import org.apache.samza.{Partition, SamzaException} import org.apache.samza.config.Config import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import scala.collection.JavaConversions._ -import java.util.concurrent.ThreadFactory -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemStream +import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, SystemFactory, SystemStream} object Util extends Logging { val random = new Random @@ -77,15 +73,17 @@ object Util extends Logging { } /** - * Uses config to create SystemAdmin classes for all input stream systems to - * get each input stream's partition count, then returns the maximum count. - * An input stream with two partitions, and a second input stream with four - * partitions would result in this method returning 4. + * For each input stream specified in config, exactly determine its partitions, returning a set of SystemStreamPartitions + * corresponding to them all + * + * @param config Source of truth for systems and inputStreams + * @return Set of SystemStreamPartitions, one for each unique system, stream and partition */ - def getMaxInputStreamPartitions(config: Config) = { - val inputStreams = config.getInputStreams + def getInputStreamPartitions(config: Config): Set[SystemStreamPartition] = { + val systemStreams = config.getInputStreams val systemNames = config.getSystemNames - val systemAdmins = systemNames.map(systemName => { + + val systemAdmins: Map[String, SystemAdmin] = systemNames.map(systemName => { val systemFactoryClassName = config .getSystemFactory(systemName) .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) @@ -93,14 +91,12 @@ object Util extends Logging { val systemAdmin = systemFactory.getAdmin(systemName, config) (systemName, systemAdmin) }).toMap - inputStreams.flatMap(systemStream => { - systemAdmins.get(systemStream.getSystem) match { - case Some(sysAdmin) => sysAdmin.getPartitions(systemStream.getStream) - case None => throw new IllegalArgumentException("Could not find a stream admin for system '" + systemStream.getSystem + "'") - } - }).toSet + + def getPartitions(is:SystemStream) = systemAdmins.getOrElse(is.getSystem, throw new IllegalArgumentException("Could not find a stream admin for system '" + is.getSystem + "'")) + .getPartitions(is.getStream).map(p => new SystemStreamPartition(is, p)) + systemStreams.map(getPartitions).flatten } - + /** * Returns a SystemStream object based on the system stream name given. For * example, kafka.topic would return new SystemStream("kafka", "topic"). @@ -121,6 +117,61 @@ object Util extends Logging { } /** + * For specified containerId, create a list of of the streams and partitions that task should handle, + * based on the number of tasks in the job + * + * @param containerId TaskID to determine work for + * @param containerCount Total number of tasks in the job + * @param ssp All SystemStreamPartitions + * @return Collection of streams and partitions for this particular containerId + */ + def getStreamsAndPartitionsForContainer(containerId:Int, containerCount:Int, ssp:Set[SystemStreamPartition]): Set[SystemStreamPartition] = { + ssp.filter(_.getPartition.getPartitionId % containerCount == containerId) + } + + /** + * Serialize a collection of stream-partitions to a string suitable for passing between processes. + * The streams will be grouped by partition. The partition will be separated from the topics by + * a colon (":"), the topics separated by commas (",") and the topic-stream groups by a slash ("/"). + * Ordering of the grouping is not specified. + * + * For example: (A,0),(A,4)(B,0)(B,4)(C,0) could be transformed to: 4:a,b/0:a,b,c + * + * @param sp Stream topics to group into a string + * @return Serialized string of the topics and streams grouped and delimited + */ + def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = { + for(ch <- List(':', ',', '/'); + s <- sp) { + if(s.getStream.contains(ch)) throw new IllegalArgumentException(s + " contains illegal character " + ch) + } + + sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + ":" + z._2.map(y => y.getSystem + "." +y.getStream).mkString(",")).mkString("/") + + } + + /** + * Invert @{list createStreamPartitionString}, building a list of streams and their partitions, + * from the string that function produced. + * + * @param sp Strings and partitions encoded as a stream by the above function + * @return List of string and partition tuples extracted from string. Order is not necessarily preserved. + */ + def createStreamPartitionsFromString(sp:String): Set[SystemStreamPartition] = { + if(sp == null || sp.isEmpty) return Set.empty + + def splitPartitionGroup(pg:String) = { + val split = pg.split(":") // Seems like there should be a more scalar way of doing this + val part = split(0).toInt + val streams = split(1).split(",").toList + + streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), new Partition(part))).toSet + } + + sp.split("/").map(splitPartitionGroup(_)).toSet.flatten + } + + /** * Makes sure that an object is not null, and throws a NullPointerException * if it is. */ http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala new file mode 100644 index 0000000..9c225d3 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util + +import org.junit.Test +import org.apache.samza.Partition +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.util.Util._ +import org.junit.Assert._ + +class TestUtil { + @Test + def testGetTopicPartitionsForTask() { + def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet + + val taskCount = 4 + val streamsMap = Map("kafka.a" -> partitionSet(4), "kafka.b" -> partitionSet(18), "timestream.c" -> partitionSet(24)) + val streamsAndParts = (for(s <- streamsMap.keys; + part <- streamsMap.getOrElse(s, Set.empty)) + yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet + + for(i <- 0 until taskCount) { + val result: Set[SystemStreamPartition] = Util.getStreamsAndPartitionsForContainer(i, taskCount, streamsAndParts) + // b -> 18 % 4 = 2 therefore first two results should have an extra element + if(i < 2) { + assertEquals(12, result.size) + } else { + assertEquals(11, result.size) + } + + result.foreach(r => assertEquals(i, r.getPartition.getPartitionId % taskCount)) + } + } + + @Test + def testCreateStreamPartitionStringBlocksDelimeters() { + val partOne = new Partition(1) + val toTry = List(':', ',', '/') + .map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne), + new SystemStreamPartition("kafka", "bad" + ch, partOne), + new SystemStreamPartition("notkafka", "alsogood", partOne)))) + toTry.foreach(t => try { + createStreamPartitionString(t._2) + fail("Should have thrown an exception") + } catch { + case iae:IllegalArgumentException => + val expected = "SystemStreamPartition [partition=Partition [partition=1], system" + + "=kafka, stream=bad" + t._1 + "] contains illegal character " + t._1 + assertEquals(expected, iae.getMessage) + } ) + } + + @Test + def testCreateStreamPartitionStringRoundTrip() { + val getPartitions = { + // Build a heavily skewed set of partitions. + def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet + val system = "all-same-system." + val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128), + system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64)) + val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap + val streamsMap = (lotsOfParts ++ fewParts) + (for(s <- streamsMap.keys; + part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet + } + + val streamsAndParts = getStreamsAndPartitionsForContainer(0, 4, getPartitions) + println(streamsAndParts) + val asString = createStreamPartitionString(streamsAndParts) + println(asString) + val backToStreamsAndParts = createStreamPartitionsFromString(asString) + + assertEquals(streamsAndParts, backToStreamsAndParts) + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index bc94f6a..2197b01 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -73,7 +73,9 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName)) val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId) val stateTopic = getTopic(jobName, jobId) - val totalPartitions = Util.getMaxInputStreamPartitions(config).size + + // This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info. + val totalPartitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size new KafkaCheckpointManager( clientId, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index 9f4db17..7a63a2b 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -21,7 +21,6 @@ package org.apache.samza.job.yarn import org.apache.hadoop.yarn.api.records.ContainerStatus import org.apache.hadoop.yarn.api.records.Container import org.apache.samza.config.Config -import org.apache.samza.Partition import grizzled.slf4j.Logging import org.apache.samza.config.YarnConfig.Config2Yarn import org.apache.samza.config.YarnConfig @@ -29,21 +28,15 @@ import org.apache.samza.job.CommandBuilder import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.fs.Path -import org.apache.samza.task.TaskContext -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.TaskConfig.Config2Task -import scala.collection.JavaConversions._ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.samza.util.Util import scala.collection.JavaConversions._ -import org.apache.samza.SamzaException import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.util.Records -import org.apache.hadoop.security.token.Token import org.apache.hadoop.yarn.api.records.LocalResource import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.api.records.ContainerLaunchContext @@ -51,17 +44,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records.LocalResourceType import org.apache.hadoop.yarn.api.records.LocalResourceVisibility -import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier -import org.apache.hadoop.io.Text -import org.apache.hadoop.net.NetUtils import java.util.Collections -import java.security.PrivilegedAction import org.apache.samza.job.ShellCommandBuilder import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.yarn.security.AMRMTokenIdentifier import java.nio.ByteBuffer -import org.apache.hadoop.yarn.client.api.NMClient import org.apache.hadoop.yarn.client.api.impl.NMClientImpl object SamzaAppMasterTaskManager { @@ -69,10 +56,6 @@ object SamzaAppMasterTaskManager { val DEFAULT_CPU_CORES = 1 val DEFAULT_CONTAINER_RETRY_COUNT = 8 val DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000 - - def getPartitionsForTask(taskId: Int, taskCount: Int, partitions: Set[Partition]) = { - partitions.filter(_.getPartitionId % taskCount == taskId).toSet - } } case class TaskFailure(val count: Int, val lastFailure: Long) @@ -93,7 +76,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA 1 } - val partitions = Util.getMaxInputStreamPartitions(config) + val allSystemStreamPartitions = Util.getInputStreamPartitions(config) var taskFailures = Map[Int, TaskFailure]() var tooManyFailedContainers = false var containerManager: NMClientImpl = null @@ -126,14 +109,13 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA state.unclaimedTasks.headOption match { case Some(taskId) => { info("Got available task id (%d) for container: %s" format (taskId, container)) - val partitionsForTask = getPartitionsForTask(taskId, state.taskCount, partitions) - info("Claimed partitions %s for task ID %s" format (partitionsForTask, taskId)) + val streamsAndPartitionsForTask = Util.getStreamsAndPartitionsForContainer(taskId, state.taskCount, allSystemStreamPartitions) + info("Claimed partitions %s for container ID %s" format (allSystemStreamPartitions, taskId)) val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName) val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder] .setConfig(config) .setName("samza-container-%s" format taskId) - .setPartitions(partitionsForTask) - .setTotalPartitions(partitions.size) + .setStreamPartitions(streamsAndPartitionsForTask) val command = cmdBuilder.buildCommand info("Task ID %s using command %s" format (taskId, command)) val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) } @@ -150,7 +132,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA state.neededContainers -= 1 state.runningTasks += taskId -> container state.unclaimedTasks -= taskId - state.taskPartitions += taskId -> partitionsForTask + state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr)) @@ -200,7 +182,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA state.releasedContainers += 1 - // If this container was assigned some partitions (a taskId), then + // If this container was assigned some partitions (a containerId), then // clean up, and request a new container for the tasks. This only // should happen if the container was 'lost' due to node failure, not // if the AM released the container. http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b694e7ad/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index ee3ffef..a7b5564 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -20,29 +20,25 @@ package org.apache.samza.job.yarn import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConversions._ import org.apache.samza.config.Config import org.apache.samza.config.MapConfig -import org.apache.samza.Partition +import org.apache.samza.{Partition, SamzaException} import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.util.ConverterUtils import scala.collection.JavaConversions._ import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -import org.apache.hadoop.service._ import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.NodeReport import TestSamzaAppMasterTaskManager._ -import org.apache.samza.system.SystemAdmin -import org.apache.samza.system.SystemFactory +import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, SystemFactory} import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.SamzaException +import org.apache.samza.util.Util._ +import org.apache.samza.util.Util object TestSamzaAppMasterTaskManager { def getContainer(containerId: ContainerId) = new Container { @@ -133,6 +129,7 @@ object TestSamzaAppMasterTaskManager { } class TestSamzaAppMasterTaskManager { + val config = new MapConfig(Map[String, String]( "yarn.container.count" -> "1", "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory", @@ -379,34 +376,34 @@ class TestSamzaAppMasterTaskManager { @Test def testPartitionsShouldWorkWithMoreTasksThanPartitions { - val onePartition = Set(new Partition(0)) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 2, onePartition).equals(Set(new Partition(0)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 2, onePartition).equals(Set())) + val onePartition = Set(new SystemStreamPartition("system", "stream", new Partition(0))) + assert(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0))))) + assert(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition).equals(Set())) } @Test def testPartitionsShouldWorkWithMorePartitionsThanTasks { - val fivePartitions = (0 until 5).map(new Partition(_)).toSet - assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 2, fivePartitions).equals(Set(new Partition(0), new Partition(2), new Partition(4)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 2, fivePartitions).equals(Set(new Partition(1), new Partition(3)))) + val fivePartitions = (0 until 5).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet + assert(Util.getStreamsAndPartitionsForContainer(0, 2, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(4))))) + assert(Util.getStreamsAndPartitionsForContainer(1, 2, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3))))) } @Test def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers { - val fivePartitions = (0 until 12).map(new Partition(_)).toSet - assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 5, fivePartitions).equals(Set(new Partition(0), new Partition(5), new Partition(10)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 5, fivePartitions).equals(Set(new Partition(1), new Partition(6), new Partition(11)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(2, 5, fivePartitions).equals(Set(new Partition(2), new Partition(7)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(3, 5, fivePartitions).equals(Set(new Partition(3), new Partition(8)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(4, 5, fivePartitions).equals(Set(new Partition(4), new Partition(9)))) + val fivePartitions = (0 until 12).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet + assert(Util.getStreamsAndPartitionsForContainer(0, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), new SystemStreamPartition("system", "stream", new Partition(10))))) + assert(Util.getStreamsAndPartitionsForContainer(1, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), new SystemStreamPartition("system", "stream", new Partition(11))))) + assert(Util.getStreamsAndPartitionsForContainer(2, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7))))) + assert(Util.getStreamsAndPartitionsForContainer(3, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8))))) + assert(Util.getStreamsAndPartitionsForContainer(4, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9))))) } @Test def testPartitionsShouldWorkWithEqualPartitionsAndTasks { - val twoPartitions = (0 until 2).map(new Partition(_)).toSet - assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 2, twoPartitions).equals(Set(new Partition(0)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(1, 2, twoPartitions).equals(Set(new Partition(1)))) - assert(SamzaAppMasterTaskManager.getPartitionsForTask(0, 1, Set(new Partition(0))).equals(Set(new Partition(0)))) + val twoPartitions = (0 until 2).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet + assert(Util.getStreamsAndPartitionsForContainer(0, 2, twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0))))) + assert(Util.getStreamsAndPartitionsForContainer(1, 2, twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1))))) + assert(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new SystemStreamPartition("system", "stream", new Partition(0)))).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0))))) } val clock = () => System.currentTimeMillis
