SAMZA-123: Move topic partition grouping to the AM and generalize
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/da79b6f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/da79b6f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/da79b6f9 Branch: refs/heads/master Commit: da79b6f92f44f44ca7649fbca20cc5389b28e29f Parents: 7cecf0a Author: Jakob Homan <[email protected]> Authored: Mon Jul 28 15:11:57 2014 -0700 Committer: Jakob Homan <[email protected]> Committed: Mon Jul 28 15:11:57 2014 -0700 ---------------------------------------------------------------------- .gitignore | 2 + build.gradle | 1 + .../0.7.0/container/samza-container.md | 10 +- .../org/apache/samza/checkpoint/Checkpoint.java | 40 +- .../samza/checkpoint/CheckpointManager.java | 34 +- .../samza/container/SamzaContainerContext.java | 14 +- .../container/SystemStreamPartitionGrouper.java | 40 ++ .../SystemStreamPartitionGrouperFactory.java | 28 ++ .../org/apache/samza/container/TaskName.java | 63 ++++ .../org/apache/samza/job/CommandBuilder.java | 13 +- .../java/org/apache/samza/task/TaskContext.java | 9 +- .../samza/checkpoint/CheckpointTool.scala | 104 +++--- .../apache/samza/checkpoint/OffsetManager.scala | 68 ++-- .../file/FileSystemCheckpointManager.scala | 57 ++- .../org/apache/samza/config/JobConfig.scala | 6 + .../samza/config/ShellCommandConfig.scala | 7 +- .../org/apache/samza/container/RunLoop.scala | 45 ++- .../apache/samza/container/SamzaContainer.scala | 112 +++--- .../SystemStreamPartitionTaskNameGrouper.scala | 38 ++ .../apache/samza/container/TaskInstance.scala | 75 ++-- .../samza/container/TaskInstanceMetrics.scala | 8 +- .../TaskNamesToSystemStreamPartitions.scala | 145 ++++++++ .../groupers/GroupByPartition.scala | 41 +++ .../groupers/GroupBySystemStreamPartition.scala | 38 ++ ...leSystemStreamPartitionTaskNameGrouper.scala | 50 +++ .../apache/samza/job/ShellCommandBuilder.scala | 66 +++- .../samza/job/local/LocalJobFactory.scala | 28 +- .../samza/serializers/CheckpointSerde.scala | 121 +++--- .../samza/storage/TaskStorageManager.scala | 13 +- .../apache/samza/task/ReadableCoordinator.scala | 4 +- .../main/scala/org/apache/samza/util/Util.scala | 193 ++++++++-- .../samza/checkpoint/TestCheckpointTool.scala | 43 ++- .../samza/checkpoint/TestOffsetManager.scala | 81 ++-- .../file/TestFileSystemCheckpointManager.scala | 47 ++- .../SystemStreamPartitionGrouperTestBase.scala | 57 +++ .../apache/samza/container/TestRunLoop.scala | 68 ++-- .../samza/container/TestSamzaContainer.scala | 11 +- .../samza/container/TestTaskInstance.scala | 6 +- .../TestTaskNamesToSystemStreamPartitions.scala | 71 ++++ .../groupers/TestGroupByPartition.scala | 37 ++ .../TestGroupBySystemStreamPartition.scala | 41 +++ ...leSystemStreamPartitionTaskNameGrouper.scala | 54 +++ .../org/apache/samza/job/TestJobRunner.scala | 1 - .../samza/job/TestShellCommandBuilder.scala | 52 +++ .../apache/samza/metrics/TestJmxServer.scala | 1 - .../samza/serializers/TestCheckpointSerde.scala | 46 +-- .../TestFileReaderSystemConsumer.scala | 14 +- .../samza/task/TestReadableCoordinator.scala | 12 +- .../scala/org/apache/samza/util/TestUtil.scala | 64 ++-- .../kafka/KafkaCheckpointLogKey.scala | 186 ++++++++++ .../kafka/KafkaCheckpointManager.scala | 366 +++++++++++++------ .../kafka/KafkaCheckpointManagerFactory.scala | 44 +-- .../samza/system/kafka/TopicMetadataCache.scala | 2 - .../kafka/TestKafkaCheckpointLogKey.scala | 71 ++++ .../kafka/TestKafkaCheckpointManager.scala | 81 ++-- .../system/kafka/TestKafkaSystemAdmin.scala | 4 +- .../samza/storage/kv/LevelDbKeyValueStore.scala | 8 +- .../samza/test/integration/join/Emitter.java | 7 +- .../performance/TestKeyValuePerformance.scala | 9 +- .../test/integration/TestStatefulTask.scala | 119 +++--- .../resources/scalate/WEB-INF/views/index.scaml | 25 +- .../samza/job/yarn/SamzaAppMasterState.scala | 8 +- .../job/yarn/SamzaAppMasterTaskManager.scala | 19 +- .../webapp/ApplicationMasterRestServlet.scala | 5 +- .../yarn/TestSamzaAppMasterTaskManager.scala | 59 +-- 65 files changed, 2335 insertions(+), 857 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index db9d3ec..7cbbfd6 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ build samza-test/state docs/learn/documentation/0.7.0/api/javadocs .DS_Store +out/ +*.patch http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index d1ee5e8..c262b5f 100644 --- a/build.gradle +++ b/build.gradle @@ -144,6 +144,7 @@ project(":samza-kafka_$scalaVersion") { compile "org.apache.zookeeper:zookeeper:$zookeeperVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion" + compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/docs/learn/documentation/0.7.0/container/samza-container.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/0.7.0/container/samza-container.md b/docs/learn/documentation/0.7.0/container/samza-container.md index a96ab4a..ab4f0e4 100644 --- a/docs/learn/documentation/0.7.0/container/samza-container.md +++ b/docs/learn/documentation/0.7.0/container/samza-container.md @@ -45,7 +45,7 @@ public interface InitableTask { } {% endhighlight %} -How many instances of your task class are created depends on the number of partitions in the job's input streams. If your Samza job has ten partitions, there will be ten instantiations of your task class: one for each partition. The first task instance will receive all messages for partition one, the second instance will receive all messages for partition two, and so on. +By default, how many instances of your task class are created depends on the number of partitions in the job's input streams. If your Samza job has ten partitions, there will be ten instantiations of your task class: one for each partition. The first task instance will receive all messages for partition one, the second instance will receive all messages for partition two, and so on. <img src="/img/0.7.0/learn/documentation/container/tasks-and-partitions.svg" alt="Illustration of tasks consuming partitions" class="diagram-large"> @@ -53,7 +53,13 @@ The number of partitions in the input streams is determined by the systems from If a Samza job has more than one input stream, the number of task instances for the Samza job is the maximum number of partitions across all input streams. For example, if a Samza job is reading from PageViewEvent (12 partitions), and ServiceMetricEvent (14 partitions), then the Samza job would have 14 task instances (numbered 0 through 13). Task instances 12 and 13 only receive events from ServiceMetricEvent, because there is no corresponding PageViewEvent partition. -There is [work underway](https://issues.apache.org/jira/browse/SAMZA-71) to make the assignment of partitions to tasks more flexible in future versions of Samza. +With this default approach to assigning input streams to task instances, Samza is effectively performing a group-by operation on the input streams with their partitions as the key. Other strategies for grouping input stream partitions are possible by implementing a new [SystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/SystemStreamPartitionGrouper.html) and factory, and configuring the job to use it via the job.systemstreampartition.grouper.factory configuration value. + +Samza provides the above-discussed per-partition grouper as well as the [GroupBySystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition), which provides a separate task class instance for every input stream partition, effectively grouping by the input stream itself. This provides maximum scalability in terms of how many containers can be used to process those input streams and is appropriate for very high volume jobs that need no grouping of the input streams. + +Considering the above example of a PageViewEvent partitioned 12 ways and a ServiceMetricEvent partitioned 14 ways, the GroupBySystemStreamPartitionGrouper would create 12 + 14 = 26 task instances, which would then be distributed across the number of containers configured, as discussed below. + +Note that once a job has been started using a particular SystemStreamPartitionGrouper and that job is using state or checkpointing, it is not possible to change that grouping in subsequent job starts, as the previous checkpoints and state information would likely be incorrect under the new grouping approach. ### Containers and resource allocation http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java index 6fad1fa..593d118 100644 --- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java @@ -19,24 +19,24 @@ package org.apache.samza.checkpoint; +import org.apache.samza.system.SystemStreamPartition; + import java.util.Collections; import java.util.Map; -import org.apache.samza.system.SystemStream; - /** * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each. * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part * of restarting a failed container within a running job. */ public class Checkpoint { - private final Map<SystemStream, String> offsets; + private final Map<SystemStreamPartition, String> offsets; /** * Constructs a new checkpoint based off a map of Samza stream offsets. * @param offsets Map of Samza streams to their current offset. */ - public Checkpoint(Map<SystemStream, String> offsets) { + public Checkpoint(Map<SystemStreamPartition, String> offsets) { this.offsets = offsets; } @@ -44,33 +44,25 @@ public class Checkpoint { * Gets a unmodifiable view of the current Samza stream offsets. * @return A unmodifiable view of a Map of Samza streams to their recorded offsets. */ - public Map<SystemStream, String> getOffsets() { + public Map<SystemStreamPartition, String> getOffsets() { return Collections.unmodifiableMap(offsets); } @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((offsets == null) ? 0 : offsets.hashCode()); - return result; + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Checkpoint)) return false; + + Checkpoint that = (Checkpoint) o; + + if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false; + + return true; } @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Checkpoint other = (Checkpoint) obj; - if (offsets == null) { - if (other.offsets != null) - return false; - } else if (!offsets.equals(other.offsets)) - return false; - return true; + public int hashCode() { + return offsets != null ? offsets.hashCode() : 0; } @Override http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java index a6e1ba6..092cb91 100644 --- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java @@ -19,7 +19,10 @@ package org.apache.samza.checkpoint; -import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; + +import java.util.Map; /** * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some @@ -30,23 +33,38 @@ public interface CheckpointManager { /** * Registers this manager to write checkpoints of a specific Samza stream partition. - * @param partition Specific Samza stream partition of which to write checkpoints for. + * @param taskName Specific Samza taskName of which to write checkpoints for. */ - public void register(Partition partition); + public void register(TaskName taskName); /** * Writes a checkpoint based on the current state of a Samza stream partition. - * @param partition Specific Samza stream partition of which to write a checkpoint of. + * @param taskName Specific Samza taskName of which to write a checkpoint of. * @param checkpoint Reference to a Checkpoint object to store offset data in. */ - public void writeCheckpoint(Partition partition, Checkpoint checkpoint); + public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint); /** - * Returns the last recorded checkpoint for a specified Samza stream partition. - * @param partition Specific Samza stream partition for which to get the last checkpoint of. + * Returns the last recorded checkpoint for a specified taskName. + * @param taskName Specific Samza taskName for which to get the last checkpoint of. * @return A Checkpoint object with the recorded offset data of the specified partition. */ - public Checkpoint readLastCheckpoint(Partition partition); + public Checkpoint readLastCheckpoint(TaskName taskName); + + /** + * Read the taskName to partition mapping that is being maintained by this CheckpointManager + * + * @return TaskName to task log partition mapping, or an empty map if there were no messages. + */ + public Map<TaskName, Integer> readChangeLogPartitionMapping(); + + /** + * Write the taskName to partition mapping that is being maintained by this CheckpointManager + * + * @param mapping Each TaskName's partition within the changelog + */ + public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping); public void stop(); + } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java index 78d56a9..c8693c8 100644 --- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java +++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java @@ -19,32 +19,32 @@ package org.apache.samza.container; -import java.util.Collection; - -import org.apache.samza.Partition; import org.apache.samza.config.Config; +import java.util.Collection; +import java.util.Collections; + /** * A SamzaContainerContext maintains per-container information for the tasks it executes. */ public class SamzaContainerContext { public final String name; public final Config config; - public final Collection<Partition> partitions; + public final Collection<TaskName> taskNames; /** * An immutable context object that can passed to tasks to give them information * about the container in which they are executing. * @param name The name of the container (either a YARN AM or SamzaContainer). * @param config The job configuration. - * @param partitions The set of input partitions assigned to this container. + * @param taskNames The set of taskName keys for which this container is responsible. */ public SamzaContainerContext( String name, Config config, - Collection<Partition> partitions) { + Collection<TaskName> taskNames) { this.name = name; this.config = config; - this.partitions = partitions; + this.taskNames = Collections.unmodifiableCollection(taskNames); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java new file mode 100644 index 0000000..897d9f5 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container; + +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Map; +import java.util.Set; + +/** + * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined + * by the implementation. Each taskName has a key that uniquely describes what sets may be in it, but does + * not generally enumerate the elements of those sets. For example, a SystemStreamPartitionGrouper that + * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating + * four TaskNames: 0, 1, 2, 3. These TaskNames describe the partitions but do not list all of the + * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing + * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than + * four partitions. On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition + * to be its own, unique group would use the SystemStreamPartition's entire description to generate + * the TaskNames. + */ +public interface SystemStreamPartitionGrouper { + public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps); +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java new file mode 100644 index 0000000..10ac6e2 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container; + +import org.apache.samza.config.Config; + +/** + * Return an instance a SystemStreamPartitionGrouper per the particular implementation + */ +public interface SystemStreamPartitionGrouperFactory { + public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config); +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/container/TaskName.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java new file mode 100644 index 0000000..13a1206 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container; + +/** + * A unique identifier of a set of a SystemStreamPartitions that have been grouped by + * a {@link org.apache.samza.container.SystemStreamPartitionGrouper}. The + * SystemStreamPartitionGrouper determines the TaskName for each set it creates. + */ +public class TaskName implements Comparable<TaskName> { + private final String taskName; + + public String getTaskName() { + return taskName; + } + + public TaskName(String taskName) { + this.taskName = taskName; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaskName taskName1 = (TaskName) o; + + if (!taskName.equals(taskName1.taskName)) return false; + + return true; + } + + @Override + public int hashCode() { + return taskName.hashCode(); + } + + @Override + public String toString() { + return taskName; + } + + @Override + public int compareTo(TaskName that) { + return taskName.compareTo(that.taskName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java index cb40092..f510ce5 100644 --- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java +++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java @@ -20,6 +20,7 @@ package org.apache.samza.job; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskName; import org.apache.samza.system.SystemStreamPartition; import java.util.Map; @@ -30,12 +31,13 @@ import java.util.Set; * such as YARN or the LocalJobRunner. */ public abstract class CommandBuilder { - protected Set<SystemStreamPartition> systemStreamPartitions; + protected Map<TaskName, Set<SystemStreamPartition>> taskNameToSystemStreamPartitionsMapping; + protected Map<TaskName, Integer> taskNameToChangeLogPartitionMapping; protected String name; protected Config config; - public CommandBuilder setStreamPartitions(Set<SystemStreamPartition> ssp) { - this.systemStreamPartitions = ssp; + public CommandBuilder setTaskNameToSystemStreamPartitionsMapping(Map<TaskName, Set<SystemStreamPartition>> systemStreamPartitionTaskNames) { + this.taskNameToSystemStreamPartitionsMapping = systemStreamPartitionTaskNames; return this; } @@ -54,6 +56,11 @@ public abstract class CommandBuilder { return this; } + public CommandBuilder setTaskNameToChangeLogPartitionMapping(Map<TaskName, Integer> mapping) { + this.taskNameToChangeLogPartitionMapping = mapping; + return this; + } + public abstract String buildCommand(); public abstract Map<String, String> buildEnvironment(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 7c1b085..35de8cc 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -19,8 +19,11 @@ package org.apache.samza.task; -import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Set; /** * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during @@ -29,7 +32,9 @@ import org.apache.samza.metrics.MetricsRegistry; public interface TaskContext { MetricsRegistry getMetricsRegistry(); - Partition getPartition(); + Set<SystemStreamPartition> getSystemStreamPartitions(); Object getStore(String name); + + TaskName getTaskName(); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 5735a39..84ea4ca 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -19,17 +19,19 @@ package org.apache.samza.checkpoint +import grizzled.slf4j.Logging import java.net.URI import java.util.regex.Pattern import joptsimple.OptionSet -import org.apache.samza.{Partition, SamzaException} -import org.apache.samza.config.{Config, StreamConfig} +import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.config.{Config, StreamConfig} +import org.apache.samza.container.TaskName import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{CommandLine, Util} +import org.apache.samza.{Partition, SamzaException} import scala.collection.JavaConversions._ -import grizzled.slf4j.Logging /** * Command-line tool for inspecting and manipulating the checkpoints for a job. @@ -44,7 +46,10 @@ import grizzled.slf4j.Logging * containing the offsets you want. It needs to be in the same format as the tool * prints out the latest checkpoint: * - * systems.<system>.streams.<topic>.partitions.<partition>=<offset> + * tasknames.<taskname>.systems.<system>.streams.<topic>.partitions.<partition>=<offset> + * + * The provided offset definitions will be grouped by <taskname> and written to + * individual checkpoint entries for each <taskname> * * NOTE: A job only reads its checkpoint when it starts up. Therefore, if you want * your checkpoint change to take effect, you have to first stop the job, then @@ -56,8 +61,10 @@ import grizzled.slf4j.Logging */ object CheckpointTool { /** Format in which SystemStreamPartition is represented in a properties file */ - val SSP_PATTERN = StreamConfig.STREAM_PREFIX + "partitions.%d" - val SSP_REGEX = Pattern.compile("systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)") + val SSP_PATTERN = "tasknames.%s." + StreamConfig.STREAM_PREFIX + "partitions.%d" + val SSP_REGEX = Pattern.compile("tasknames\\.(.+)\\.systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)") + + type TaskNameToCheckpointMap = Map[TaskName, Map[SystemStreamPartition, String]] class CheckpointToolCommandLine extends CommandLine with Logging { val newOffsetsOpt = @@ -68,20 +75,31 @@ object CheckpointTool { .ofType(classOf[URI]) .describedAs("path") - var newOffsets: Map[SystemStreamPartition, String] = null + var newOffsets: TaskNameToCheckpointMap = null - def parseOffsets(propertiesFile: Config): Map[SystemStreamPartition, String] = { - propertiesFile.entrySet.flatMap(entry => { + def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = { + val taskNameSSPPairs = propertiesFile.entrySet.flatMap(entry => { val matcher = SSP_REGEX.matcher(entry.getKey) if (matcher.matches) { - val partition = new Partition(Integer.parseInt(matcher.group(3))) - val ssp = new SystemStreamPartition(matcher.group(1), matcher.group(2), partition) - Some(ssp -> entry.getValue) + val taskname = new TaskName(matcher.group(1)) + val partition = new Partition(Integer.parseInt(matcher.group(4))) + val ssp = new SystemStreamPartition(matcher.group(2), matcher.group(3), partition) + Some(taskname -> Map(ssp -> entry.getValue)) } else { warn("Warning: ignoring unrecognised property: %s = %s" format (entry.getKey, entry.getValue)) None } - }).toMap + }).toList + + if(taskNameSSPPairs.isEmpty) { + return null + } + + // Need to turn taskNameSSPPairs List[(taskname, Map[SystemStreamPartition, Offset])] to Map[TaskName, Map[SSP, Offset]] + taskNameSSPPairs // List[(taskname, Map[SystemStreamPartition, Offset])] + .groupBy(_._1) // Group by taskname + .mapValues(m => m.map(_._2)) // Drop the extra taskname that we grouped on + .mapValues(m => m.reduce( _ ++ _)) // Merge all the maps of SSPs->Offset into one for the whole taskname } override def loadConfig(options: OptionSet) = { @@ -103,7 +121,7 @@ object CheckpointTool { } } -class CheckpointTool(config: Config, newOffsets: Map[SystemStreamPartition, String]) extends Logging { +class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extends Logging { val manager = config.getCheckpointManagerFactory match { case Some(className) => Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap) @@ -113,52 +131,48 @@ class CheckpointTool(config: Config, newOffsets: Map[SystemStreamPartition, Stri // The CheckpointManagerFactory needs to perform this same operation when initializing // the manager. TODO figure out some way of avoiding duplicated work. - val partitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet def run { info("Using %s" format manager) - partitions.foreach(manager.register) + + // Find all the TaskNames that would be generated for this job config + val taskNames = Util.assignContainerToSSPTaskNames(config, 1).get(0).get.keys.toSet + + taskNames.foreach(manager.register) manager.start - val lastCheckpoint = readLastCheckpoint - logCheckpoint(lastCheckpoint, "Current checkpoint") + val lastCheckpoints = taskNames.map(tn => tn -> readLastCheckpoint(tn)).toMap + + lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current checkpoint for taskname "+ lcp._1)) if (newOffsets != null) { - logCheckpoint(newOffsets, "New offset to be written") - writeNewCheckpoint(newOffsets) - manager.stop - info("Ok, new checkpoint has been written.") + newOffsets.foreach(no => { + logCheckpoint(no._1, no._2, "New offset to be written for taskname " + no._1) + writeNewCheckpoint(no._1, no._2) + info("Ok, new checkpoint has been written for taskname " + no._1) + }) } + + manager.stop } - /** Load the most recent checkpoint state for all partitions. */ - def readLastCheckpoint: Map[SystemStreamPartition, String] = { - partitions.flatMap(partition => { - manager.readLastCheckpoint(partition) - .getOffsets - .map { case (systemStream, offset) => - new SystemStreamPartition(systemStream, partition) -> offset - } - }).toMap + /** Load the most recent checkpoint state for all a specified TaskName. */ + def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, String] = { + manager.readLastCheckpoint(taskName).getOffsets.toMap } /** - * Store a new checkpoint state for all given partitions, overwriting the - * current state. Any partitions that are not mentioned will not - * be changed. + * Store a new checkpoint state for specified TaskName, overwriting any previous + * checkpoint for that TaskName */ - def writeNewCheckpoint(newOffsets: Map[SystemStreamPartition, String]) { - newOffsets.groupBy(_._1.getPartition).foreach { - case (partition, offsets) => - val streamOffsets = offsets.map { case (ssp, offset) => ssp.getSystemStream -> offset }.toMap - val checkpoint = new Checkpoint(streamOffsets) - manager.writeCheckpoint(partition, checkpoint) - } + def writeNewCheckpoint(tn: TaskName, newOffsets: Map[SystemStreamPartition, String]) { + val checkpoint = new Checkpoint(newOffsets) + manager.writeCheckpoint(tn, checkpoint) } - def logCheckpoint(checkpoint: Map[SystemStreamPartition, String], prefix: String) { - checkpoint.map { case (ssp, offset) => - (CheckpointTool.SSP_PATTERN + " = %s") format (ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset) - }.toList.sorted.foreach(line => info(prefix + ": " + line)) + def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition, String], prefix: String) { + def logLine(tn:TaskName, ssp:SystemStreamPartition, offset:String) = (prefix + ": " + CheckpointTool.SSP_PATTERN + " = %s") format (tn.toString, ssp.getSystem, ssp.getStream, ssp.getPartition.getPartitionId, offset) + + checkpoint.keys.toList.sorted.foreach(ssp => info(logLine(tn, ssp, checkpoint.get(ssp).get))) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 9487b58..4efe997 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -20,7 +20,6 @@ package org.apache.samza.checkpoint import org.apache.samza.system.SystemStream -import org.apache.samza.Partition import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.OffsetType @@ -31,6 +30,8 @@ import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.system.SystemAdmin +import org.apache.samza.container.TaskName +import scala.collection._ /** * OffsetSetting encapsulates a SystemStream's metadata, default offset, and @@ -150,13 +151,13 @@ class OffsetManager( /** * The set of system stream partitions that have been registered with the - * OffsetManager. These are the SSPs that will be tracked within the offset - * manager. + * OffsetManager, grouped by the taskName they belong to. These are the SSPs + * that will be tracked within the offset manager. */ - var systemStreamPartitions = Set[SystemStreamPartition]() + val systemStreamPartitions = mutable.Map[TaskName, mutable.Set[SystemStreamPartition]]() - def register(systemStreamPartition: SystemStreamPartition) { - systemStreamPartitions += systemStreamPartition + def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) { + systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister) } def start { @@ -193,20 +194,18 @@ class OffsetManager( } /** - * Checkpoint all offsets for a given partition using the CheckpointManager. + * Checkpoint all offsets for a given TaskName using the CheckpointManager. */ - def checkpoint(partition: Partition) { + def checkpoint(taskName: TaskName) { if (checkpointManager != null) { - debug("Checkpointing offsets for partition %s." format partition) + debug("Checkpointing offsets for taskName %s." format taskName) - val partitionOffsets = lastProcessedOffsets - .filterKeys(_.getPartition.equals(partition)) - .map { case (systemStreamPartition, offset) => (systemStreamPartition.getSystemStream, offset) } - .toMap + val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet + val partitionOffsets = lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_)) - checkpointManager.writeCheckpoint(partition, new Checkpoint(partitionOffsets)) + checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets)) } else { - debug("Skipping checkpointing for partition %s because no checkpoint manager is defined." format partition) + debug("Skipping checkpointing for taskName %s because no checkpoint manager is defined." format taskName) } } @@ -221,23 +220,13 @@ class OffsetManager( } /** - * Returns a set of partitions that have been registered with this offset - * manager. - */ - private def getPartitions = { - systemStreamPartitions - .map(_.getPartition) - .toSet - } - - /** * Register all partitions with the CheckpointManager. */ private def registerCheckpointManager { if (checkpointManager != null) { debug("Registering checkpoint manager.") - getPartitions.foreach(checkpointManager.register) + systemStreamPartitions.keys.foreach(checkpointManager.register) } else { debug("Skipping checkpoint manager registration because no manager was defined.") } @@ -253,9 +242,8 @@ class OffsetManager( checkpointManager.start - lastProcessedOffsets ++= getPartitions - .flatMap(restoreOffsetsFromCheckpoint(_)) - .filter { + lastProcessedOffsets ++= systemStreamPartitions.keys + .flatMap(restoreOffsetsFromCheckpoint(_)).filter { case (systemStreamPartition, offset) => val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream) if (!shouldKeep) { @@ -269,20 +257,17 @@ class OffsetManager( } /** - * Loads last processed offsets for a single partition. + * Loads last processed offsets for a single taskName. */ - private def restoreOffsetsFromCheckpoint(partition: Partition): Map[SystemStreamPartition, String] = { - debug("Loading checkpoints for partition: %s." format partition) + private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[SystemStreamPartition, String] = { + debug("Loading checkpoints for taskName: %s." format taskName) - val checkpoint = checkpointManager.readLastCheckpoint(partition) + val checkpoint = checkpointManager.readLastCheckpoint(taskName) if (checkpoint != null) { - checkpoint - .getOffsets - .map { case (systemStream, offset) => (new SystemStreamPartition(systemStream, partition), offset) } - .toMap + checkpoint.getOffsets.toMap } else { - info("Did not receive a checkpoint for partition %s. Proceeding without a checkpoint." format partition) + info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName) Map() } @@ -338,7 +323,12 @@ class OffsetManager( * that was registered, but has no offset. */ private def loadDefaults { - systemStreamPartitions.foreach(systemStreamPartition => { + val allSSPs: Set[SystemStreamPartition] = systemStreamPartitions + .values + .flatten + .toSet + + allSSPs.foreach(systemStreamPartition => { if (!startingOffsets.contains(systemStreamPartition)) { val systemStream = systemStreamPartition.getSystemStream val partition = systemStreamPartition.getPartition http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala index 364e489..2a87a6e 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala @@ -20,40 +20,41 @@ package org.apache.samza.checkpoint.file import java.io.File +import java.io.FileNotFoundException import java.io.FileOutputStream -import scala.collection.JavaConversions._ -import scala.io.Source +import java.util import org.apache.samza.SamzaException -import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.checkpoint.CheckpointManagerFactory import org.apache.samza.config.Config -import org.apache.samza.Partition -import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP -import org.apache.samza.checkpoint.CheckpointManagerFactory -import org.apache.samza.checkpoint.CheckpointManager -import org.apache.samza.checkpoint.Checkpoint -import java.io.FileNotFoundException +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.container.TaskName +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.serializers.CheckpointSerde +import scala.io.Source class FileSystemCheckpointManager( jobName: String, root: File, serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager { - def register(partition: Partition) { - } + override def register(taskName: TaskName):Unit = Unit - def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { + def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints") + + def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { val bytes = serde.toBytes(checkpoint) - val fos = new FileOutputStream(getFile(jobName, partition)) + val fos = new FileOutputStream(getCheckpointFile(taskName)) fos.write(bytes) fos.close } - def readLastCheckpoint(partition: Partition): Checkpoint = { + def readLastCheckpoint(taskName: TaskName): Checkpoint = { try { - val bytes = Source.fromFile(getFile(jobName, partition)).map(_.toByte).toArray + val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray serde.fromBytes(bytes) } catch { @@ -69,8 +70,28 @@ class FileSystemCheckpointManager( def stop {} - private def getFile(jobName: String, partition: Partition) = - new File(root, "%s-%d" format (jobName, partition.getPartitionId)) + private def getFile(jobName: String, taskName: TaskName, fileType:String) = + new File(root, "%s-%s-%s" format (jobName, taskName, fileType)) + + private def getChangeLogPartitionMappingFile() = getFile(jobName, new TaskName("partition-mapping"), "changelog-partition-mapping") + + override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = { + try { + val bytes = Source.fromFile(getChangeLogPartitionMappingFile()).map(_.toByte).toArray + serde.changelogPartitionMappingFromBytes(bytes) + } catch { + case e: FileNotFoundException => new util.HashMap[TaskName, java.lang.Integer]() + } + } + + def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = { + val hashmap = new util.HashMap[TaskName, java.lang.Integer](mapping) + val bytes = serde.changelogPartitionMappingToBytes(hashmap) + val fos = new FileOutputStream(getChangeLogPartitionMappingFile()) + + fos.write(bytes) + fos.close + } } class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index fcafe83..f84aeea 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -19,6 +19,8 @@ package org.apache.samza.config +import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory + object JobConfig { // job config constants val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class @@ -34,6 +36,8 @@ object JobConfig { val JOB_NAME = "job.name" // streaming.job_name val JOB_ID = "job.id" // streaming.job_id + val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory" + implicit def Config2Job(config: Config) = new JobConfig(config) } @@ -47,4 +51,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) { def getConfigRewriters = getOption(JobConfig.CONFIG_REWRITERS) def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name) + + def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala index 0cdc0d1..e4197fa 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala @@ -26,7 +26,12 @@ object ShellCommandConfig { val ENV_CONFIG = "SAMZA_CONFIG" /** - * An encoded list of the streams and partitions this container is responsible for. Encoded by + * All taskNames across the job; used to calculate state store partition mapping + */ + val ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING = "TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING" + + /** + * A serialized list of the streams and partitions this container is responsible for. Encoded by * {@link org.apache.samza.util.Util#createStreamPartitionString} */ val ENV_SYSTEM_STREAMS = "SAMZA_SYSTEM_STREAMS" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 4ca340c..7fb4763 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -20,8 +20,7 @@ package org.apache.samza.container import grizzled.slf4j.Logging -import org.apache.samza.Partition -import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.{SystemStreamPartition, SystemConsumers} import org.apache.samza.task.ReadableCoordinator /** @@ -34,7 +33,7 @@ import org.apache.samza.task.ReadableCoordinator * be done when. */ class RunLoop( - val taskInstances: Map[Partition, TaskInstance], + val taskInstances: Map[TaskName, TaskInstance], val consumerMultiplexer: SystemConsumers, val metrics: SamzaContainerMetrics, val windowMs: Long = -1, @@ -43,10 +42,18 @@ class RunLoop( private var lastWindowMs = 0L private var lastCommitMs = 0L - private var taskShutdownRequests: Set[Partition] = Set() - private var taskCommitRequests: Set[Partition] = Set() + private var taskShutdownRequests: Set[TaskName] = Set() + private var taskCommitRequests: Set[TaskName] = Set() private var shutdownNow = false + // Messages come from the chooser with no connection to the TaskInstance they're bound for. + // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them. + val systemStreamPartitionToTaskInstance: Map[SystemStreamPartition, TaskInstance] = { + // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly + def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap + + taskInstances.values.map{ getSystemStreamPartitionToTaskInstance }.flatten.toMap + } /** * Starts the run loop. Blocks until either the tasks request shutdown, or an @@ -61,7 +68,6 @@ class RunLoop( } } - /** * Chooses a message from an input stream to process, and calls the * process() method on the appropriate StreamTask to handle it. @@ -73,13 +79,15 @@ class RunLoop( val envelope = consumerMultiplexer.choose if (envelope != null) { - val partition = envelope.getSystemStreamPartition.getPartition + val ssp = envelope.getSystemStreamPartition - trace("Processing incoming message envelope for partition %s." format partition) + trace("Processing incoming message envelope for SSP %s." format ssp) metrics.envelopes.inc - val coordinator = new ReadableCoordinator(partition) - taskInstances(partition).process(envelope, coordinator) + val taskInstance = systemStreamPartitionToTaskInstance(ssp) + + val coordinator = new ReadableCoordinator(taskInstance.taskName) + taskInstance.process(envelope, coordinator) checkCoordinator(coordinator) } else { trace("No incoming message envelope was available.") @@ -87,7 +95,6 @@ class RunLoop( } } - /** * Invokes WindowableTask.window on all tasks if it's time to do so. */ @@ -97,8 +104,8 @@ class RunLoop( lastWindowMs = clock() metrics.windows.inc - taskInstances.foreach { case (partition, task) => - val coordinator = new ReadableCoordinator(partition) + taskInstances.foreach { case (taskName, task) => + val coordinator = new ReadableCoordinator(taskName) task.window(coordinator) checkCoordinator(coordinator) } @@ -129,8 +136,8 @@ class RunLoop( } else if (!taskCommitRequests.isEmpty) { trace("Committing due to explicit commit request.") metrics.commits.inc - taskCommitRequests.foreach(partition => { - taskInstances(partition).commit + taskCommitRequests.foreach(taskName => { + taskInstances(taskName).commit }) } @@ -146,17 +153,17 @@ class RunLoop( */ private def checkCoordinator(coordinator: ReadableCoordinator) { if (coordinator.requestedCommitTask) { - debug("Task %s requested commit for current task only" format coordinator.partition) - taskCommitRequests += coordinator.partition + debug("Task %s requested commit for current task only" format coordinator.taskName) + taskCommitRequests += coordinator.taskName } if (coordinator.requestedCommitAll) { - debug("Task %s requested commit for all tasks in the container" format coordinator.partition) + debug("Task %s requested commit for all tasks in the container" format coordinator.taskName) taskCommitRequests ++= taskInstances.keys } if (coordinator.requestedShutdownOnConsensus) { - taskShutdownRequests += coordinator.partition + taskShutdownRequests += coordinator.taskName info("Shutdown has now been requested by tasks: %s" format taskShutdownRequests) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index a7142b2..d574ac4 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -19,12 +19,11 @@ package org.apache.samza.container -import java.io.File import grizzled.slf4j.Logging +import java.io.File import org.apache.samza.Partition import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.CheckpointManager -import org.apache.samza.checkpoint.CheckpointManagerFactory +import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager} import org.apache.samza.config.Config import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer @@ -34,35 +33,34 @@ import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config.serializers.JsonConfigSerializer +import org.apache.samza.job.ShellCommandBuilder import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.JvmMetrics +import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.MetricsReporter import org.apache.samza.metrics.MetricsReporterFactory import org.apache.samza.serializers.SerdeFactory import org.apache.samza.serializers.SerdeManager import org.apache.samza.storage.StorageEngineFactory import org.apache.samza.storage.TaskStorageManager +import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.SystemConsumersMetrics import org.apache.samza.system.SystemFactory +import org.apache.samza.system.SystemProducers +import org.apache.samza.system.SystemProducersMetrics import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.chooser.DefaultChooser +import org.apache.samza.system.chooser.MessageChooserFactory +import org.apache.samza.system.chooser.RoundRobinChooserFactory +import org.apache.samza.task.ReadableCollector import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.TaskLifecycleListenerFactory import org.apache.samza.util.Util -import org.apache.samza.system.SystemProducers -import org.apache.samza.task.ReadableCollector -import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.chooser.MessageChooserFactory -import org.apache.samza.system.SystemProducersMetrics -import org.apache.samza.system.SystemConsumersMetrics -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.system.chooser.DefaultChooser -import org.apache.samza.system.chooser.RoundRobinChooserFactory import scala.collection.JavaConversions._ -import org.apache.samza.system.SystemAdmin -import org.apache.samza.system.SystemStreamMetadata -import org.apache.samza.checkpoint.OffsetManager -import org.apache.samza.system.StreamMetadataCache object SamzaContainer extends Logging { @@ -92,29 +90,46 @@ object SamzaContainer extends Logging { * properties. Note: This is a temporary workaround to reduce the size of the config and hence size * of the environment variable(s) exported while starting a Samza container (SAMZA-337) */ - val isCompressed = if (System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")) true else false + val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE") val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed) val config = JsonConfigSerializer.fromJson(configStr) - val encodedStreamsAndPartitions = getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed) - val partitions = Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions) - - if (partitions.isEmpty) { - throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.") - } + val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed)) + val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed)) try { - SamzaContainer(containerName, partitions, config).run + SamzaContainer(containerName, sspTaskNames, taskNameToChangeLogPartitionMapping, config).run } finally { jmxServer.stop } } - def apply(containerName: String, inputStreams: Set[SystemStreamPartition], config: Config) = { + def getTaskNameToSystemStreamPartition(SSPTaskNamesJSON: String) = { + // Covert into a standard Java map + val sspTaskNamesAsJava: Map[TaskName, Set[SystemStreamPartition]] = ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(SSPTaskNamesJSON) + + // From that map build the TaskNamesToSystemStreamPartitions + val sspTaskNames = TaskNamesToSystemStreamPartitions(sspTaskNamesAsJava) + + if (sspTaskNames.isEmpty) { + throw new SamzaException("No SystemStreamPartitions for this task. Can't run a task without SystemStreamPartition assignments.") + } + + sspTaskNames + } + + def getTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMappingJSON: String) = { + // Convert that mapping into a Map + val taskNameToChangeLogPartitionMapping = ShellCommandBuilder.deserializeTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMappingJSON).map(kv => kv._1 -> Integer.valueOf(kv._2)) + + taskNameToChangeLogPartitionMapping + } + + def apply(containerName: String, sspTaskNames: TaskNamesToSystemStreamPartitions, taskNameToChangeLogPartitionMapping: Map[TaskName, java.lang.Integer], config: Config) = { val containerPID = Util.getContainerPID info("Setting up Samza container: %s" format containerName) + info("Using SystemStreamPartition taskNames %s" format sspTaskNames) info("Samza container PID: %s" format containerPID) - info("Using streams and partitions: %s" format inputStreams) info("Using configuration: %s" format config) val registry = new MetricsRegistryMap(containerName) @@ -122,7 +137,7 @@ object SamzaContainer extends Logging { val systemProducersMetrics = new SystemProducersMetrics(registry) val systemConsumersMetrics = new SystemConsumersMetrics(registry) - val inputSystems = inputStreams.map(_.getSystem) + val inputSystems = sspTaskNames.getAllSystems() val systemNames = config.getSystemNames @@ -150,7 +165,7 @@ object SamzaContainer extends Logging { info("Got system factories: %s" format systemFactories.keys) val streamMetadataCache = new StreamMetadataCache(systemAdmins) - val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputStreams.map(_.getSystemStream)) + val inputStreamMetadata = streamMetadataCache.getStreamMetadata(sspTaskNames.getAllSystemStreams) info("Got input stream metadata: %s" format inputStreamMetadata) @@ -219,7 +234,7 @@ object SamzaContainer extends Logging { * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps. */ val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => { - (serdeStreams ++ inputStreams) + (serdeStreams ++ sspTaskNames.getAllSSPs()) .filter(systemStream => getSerdeName(systemStream).isDefined) .map(systemStream => { val serdeName = getSerdeName(systemStream).get @@ -384,20 +399,20 @@ object SamzaContainer extends Logging { info("Got commit milliseconds: %s" format taskCommitMs) - // Wire up all task-level (unshared) objects. + // Wire up all task-instance-level (unshared) objects. - val partitions = inputStreams.map(_.getPartition).toSet + val taskNames = sspTaskNames.keys.toSet - val containerContext = new SamzaContainerContext(containerName, config, partitions) + val containerContext = new SamzaContainerContext(containerName, config, taskNames) - val taskInstances = partitions.map(partition => { - debug("Setting up task instance: %s" format partition) + val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => { + debug("Setting up task instance: %s" format taskName) val task = Util.getObj[StreamTask](taskClassName) val collector = new ReadableCollector - val taskInstanceMetrics = new TaskInstanceMetrics("Partition-%s" format partition.getPartitionId) + val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName) val storeConsumers = changeLogSystemStreams .map { @@ -410,11 +425,13 @@ object SamzaContainer extends Logging { info("Got store consumers: %s" format storeConsumers) + val partitionForThisTaskName = new Partition(taskNameToChangeLogPartitionMapping(taskName)) + val taskStores = storageEngineFactories .map { case (storeName, storageEngineFactory) => val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) { - new SystemStreamPartition(changeLogSystemStreams(storeName), partition) + new SystemStreamPartition(changeLogSystemStreams(storeName), partitionForThisTaskName) } else { null } @@ -426,7 +443,7 @@ object SamzaContainer extends Logging { case Some(msgSerde) => serdes(msgSerde) case _ => null } - val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition) + val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) val storageEngine = storageEngineFactory.getStorageEngine( storeName, storePartitionDir, @@ -441,25 +458,26 @@ object SamzaContainer extends Logging { info("Got task stores: %s" format taskStores) - val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata) + val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata) - info("Assigning oldest change log offsets for partition %s: %s" format (partition, changeLogOldestOffsets)) + info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) val storageManager = new TaskStorageManager( - partition = partition, + taskName = taskName, taskStores = taskStores, storeConsumers = storeConsumers, changeLogSystemStreams = changeLogSystemStreams, changeLogOldestOffsets = changeLogOldestOffsets, - storeBaseDir = storeBaseDir) + storeBaseDir = storeBaseDir, + partitionForThisTaskName) - val inputStreamsForThisPartition = inputStreams.filter(_.getPartition.equals(partition)).map(_.getSystemStream) + val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames)) - info("Assigning SystemStreams " + inputStreamsForThisPartition + " to " + partition) + info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName) val taskInstance = new TaskInstance( task = task, - partition = partition, + taskName = taskName, config = config, metrics = taskInstanceMetrics, consumerMultiplexer = consumerMultiplexer, @@ -468,10 +486,10 @@ object SamzaContainer extends Logging { storageManager = storageManager, reporters = reporters, listeners = listeners, - inputStreams = inputStreamsForThisPartition, + systemStreamPartitions = systemStreamPartitions, collector = collector) - (partition, taskInstance) + (taskName, taskInstance) }).toMap val runLoop = new RunLoop( @@ -506,7 +524,7 @@ object SamzaContainer extends Logging { } class SamzaContainer( - taskInstances: Map[Partition, TaskInstance], + taskInstances: Map[TaskName, TaskInstance], runLoop: RunLoop, consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala new file mode 100644 index 0000000..a8c93ac --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container + +/** + * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of + * {@link org.apache.samza.container.SystemStreamPartitionGrouper}, we can then map those groupings onto + * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes + * those groupings-of-SSPs and groups them together on which container each should run on. A simple + * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More + * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them + * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc. + */ +trait SystemStreamPartitionTaskNameGrouper { + /** + * Group TaskNamesToSystemStreamPartitions onto the containers they will share + * + * @param taskNames Pre-grouped SSPs + * @return Mapping of container ID to set if TaskNames it will run + */ + def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 99a9841..9484ddb 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -21,33 +21,27 @@ package org.apache.samza.container import org.apache.samza.metrics.MetricsReporter import org.apache.samza.config.Config -import org.apache.samza.Partition import grizzled.slf4j.Logging -import scala.collection.JavaConversions._ import org.apache.samza.storage.TaskStorageManager -import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.TaskContext import org.apache.samza.task.ClosableTask import org.apache.samza.task.InitableTask import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.task.WindowableTask -import org.apache.samza.checkpoint.CheckpointManager import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.StreamTask -import org.apache.samza.system.SystemStream -import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.task.ReadableCollector import org.apache.samza.system.SystemConsumers import org.apache.samza.system.SystemProducers import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.metrics.Gauge import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.SamzaException +import scala.collection.JavaConversions._ class TaskInstance( task: StreamTask, - partition: Partition, + val taskName: TaskName, config: Config, metrics: TaskInstanceMetrics, consumerMultiplexer: SystemConsumers, @@ -56,15 +50,14 @@ class TaskInstance( storageManager: TaskStorageManager = null, reporters: Map[String, MetricsReporter] = Map(), listeners: Seq[TaskLifecycleListener] = Seq(), - inputStreams: Set[SystemStream] = Set(), + val systemStreamPartitions: Set[SystemStreamPartition] = Set(), collector: ReadableCollector = new ReadableCollector) extends Logging { - val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isClosableTask = task.isInstanceOf[ClosableTask] val context = new TaskContext { def getMetricsRegistry = metrics.registry - def getPartition = partition + def getSystemStreamPartitions = systemStreamPartitions def getStore(storeName: String) = if (storageManager != null) { storageManager(storeName) } else { @@ -72,29 +65,28 @@ class TaskInstance( null } + def getTaskName = taskName } def registerMetrics { - debug("Registering metrics for partition: %s." format partition) + debug("Registering metrics for taskName: %s" format taskName) reporters.values.foreach(_.register(metrics.source, metrics.registry)) } def registerOffsets { - debug("Registering offsets for partition: %s." format partition) + debug("Registering offsets for taskName: %s" format taskName) - inputStreams.foreach(systemStream => { - offsetManager.register(new SystemStreamPartition(systemStream, partition)) - }) + offsetManager.register(taskName, systemStreamPartitions) } def startStores { if (storageManager != null) { - debug("Starting storage manager for partition: %s." format partition) + debug("Starting storage manager for taskName: %s" format taskName) storageManager.init } else { - debug("Skipping storage manager initialization for partition: %s." format partition) + debug("Skipping storage manager initialization for taskName: %s" format taskName) } } @@ -102,31 +94,30 @@ class TaskInstance( listeners.foreach(_.beforeInit(config, context)) if (isInitableTask) { - debug("Initializing task for partition: %s." format partition) + debug("Initializing task for taskName: %s" format taskName) task.asInstanceOf[InitableTask].init(config, context) } else { - debug("Skipping task initialization for partition: %s." format partition) + debug("Skipping task initialization for taskName: %s" format taskName) } listeners.foreach(_.afterInit(config, context)) } def registerProducers { - debug("Registering producers for partition: %s." format partition) + debug("Registering producers for taskName: %s" format taskName) producerMultiplexer.register(metrics.source) } def registerConsumers { - debug("Registering consumers for partition: %s." format partition) + debug("Registering consumers for taskName: %s" format taskName) - inputStreams.foreach(systemStream => { - val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + systemStreamPartitions.foreach(systemStreamPartition => { val offset = offsetManager.getStartingOffset(systemStreamPartition) - .getOrElse(throw new SamzaException("No offset defined for partition %s: %s" format (partition, systemStream))) + .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition)) consumerMultiplexer.register(systemStreamPartition, offset) - metrics.addOffsetGauge(systemStream, () => { + metrics.addOffsetGauge(systemStreamPartition, () => { offsetManager .getLastProcessedOffset(systemStreamPartition) .getOrElse(null) @@ -139,20 +130,20 @@ class TaskInstance( listeners.foreach(_.beforeProcess(envelope, config, context)) - trace("Processing incoming message envelope for partition: %s, %s" format (partition, envelope.getSystemStreamPartition)) + trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) task.process(envelope, collector, coordinator) listeners.foreach(_.afterProcess(envelope, config, context)) - trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset)) + trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset) } def window(coordinator: ReadableCoordinator) { if (isWindowableTask) { - trace("Windowing for partition: %s" format partition) + trace("Windowing for taskName: %s" format taskName) metrics.windows.inc @@ -162,48 +153,48 @@ class TaskInstance( def send { if (collector.envelopes.size > 0) { - trace("Sending messages for partition: %s, %s" format (partition, collector.envelopes.size)) + trace("Sending messages for taskName: %s, %s" format (taskName, collector.envelopes.size)) metrics.sends.inc metrics.messagesSent.inc(collector.envelopes.size) collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope)) - trace("Resetting collector for partition: %s" format partition) + trace("Resetting collector for taskName: %s" format taskName) collector.reset } else { - trace("Skipping send for partition %s because no messages were collected." format partition) + trace("Skipping send for taskName %s because no messages were collected." format taskName) metrics.sendsSkipped.inc } } def commit { - trace("Flushing state stores for partition: %s" format partition) + trace("Flushing state stores for taskName: %s" format taskName) metrics.commits.inc storageManager.flush - trace("Flushing producers for partition: %s" format partition) + trace("Flushing producers for taskName: %s" format taskName) producerMultiplexer.flush(metrics.source) - trace("Committing offset manager for partition: %s" format partition) + trace("Committing offset manager for taskName: %s" format taskName) - offsetManager.checkpoint(partition) + offsetManager.checkpoint(taskName) } def shutdownTask { listeners.foreach(_.beforeClose(config, context)) if (task.isInstanceOf[ClosableTask]) { - debug("Shutting down stream task for partition: %s" format partition) + debug("Shutting down stream task for taskName: %s" format taskName) task.asInstanceOf[ClosableTask].close } else { - debug("Skipping stream task shutdown for partition: %s" format partition) + debug("Skipping stream task shutdown for taskName: %s" format taskName) } listeners.foreach(_.afterClose(config, context)) @@ -211,16 +202,16 @@ class TaskInstance( def shutdownStores { if (storageManager != null) { - debug("Shutting down storage manager for partition: %s" format partition) + debug("Shutting down storage manager for taskName: %s" format taskName) storageManager.stop } else { - debug("Skipping storage manager shutdown for partition: %s" format partition) + debug("Skipping storage manager shutdown for taskName: %s" format taskName) } } - override def toString() = "TaskInstance for class %s and partition %s." format (task.getClass.getName, partition) + override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) - def toDetailedString() = "TaskInstance [windowable=%s, closable=%s, collector_size=%s]" format (isWindowableTask, isClosableTask, collector.envelopes.size) + def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s, collector_size=%s]" format (taskName, isWindowableTask, isClosableTask, collector.envelopes.size) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index 7502124..aae3f87 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -21,10 +21,8 @@ package org.apache.samza.container import org.apache.samza.metrics.ReadableMetricsRegistry import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.Partition import org.apache.samza.metrics.MetricsHelper -import org.apache.samza.system.SystemStream -import org.apache.samza.metrics.Gauge +import org.apache.samza.system.SystemStreamPartition class TaskInstanceMetrics( val source: String = "unknown", @@ -37,7 +35,7 @@ class TaskInstanceMetrics( val sendsSkipped = newCounter("send-skipped") val messagesSent = newCounter("messages-sent") - def addOffsetGauge(systemStream: SystemStream, getValue: () => String) { - newGauge("%s-%s-offset" format (systemStream.getSystem, systemStream.getStream), getValue) + def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: () => String) { + newGauge("%s-%s-%d-offset" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), getValue) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala new file mode 100644 index 0000000..427119e --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container + +import grizzled.slf4j.Logging +import org.apache.samza.SamzaException +import org.apache.samza.system.{SystemStream, SystemStreamPartition} +import scala.collection.{immutable, Map, MapLike} + +/** + * Map of {@link TaskName} to its set of {@link SystemStreamPartition}s with additional methods for aggregating + * those SystemStreamPartitions' individual system, streams and partitions. Is useful for highlighting this + * particular, heavily used map within the code. + * + * @param m Original map of TaskNames to SystemStreamPartitions + */ +class TaskNamesToSystemStreamPartitions(m:Map[TaskName, Set[SystemStreamPartition]] = Map[TaskName, Set[SystemStreamPartition]]()) + extends Map[TaskName, Set[SystemStreamPartition]] + with MapLike[TaskName, Set[SystemStreamPartition], TaskNamesToSystemStreamPartitions] with Logging { + + // Constructor + validate + + // Methods + + // TODO: Get rid of public constructor, rely entirely on the companion object + override def -(key: TaskName): TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions(m - key) + + override def +[B1 >: Set[SystemStreamPartition]](kv: (TaskName, B1)): Map[TaskName, B1] = new TaskNamesToSystemStreamPartitions(m + kv.asInstanceOf[(TaskName, Set[SystemStreamPartition])]) + + override def iterator: Iterator[(TaskName, Set[SystemStreamPartition])] = m.iterator + + override def get(key: TaskName): Option[Set[SystemStreamPartition]] = m.get(key) + + override def empty: TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions() + + override def seq: Map[TaskName, Set[SystemStreamPartition]] = m.seq + + override def foreach[U](f: ((TaskName, Set[SystemStreamPartition])) => U): Unit = m.foreach(f) + + override def size: Int = m.size + + /** + * Validate that this is a legal mapping of TaskNames to SystemStreamPartitions. At the moment, + * we only check that an SSP is included in the mapping at most once. We could add other, + * pluggable validations here, or if we decided to allow an SSP to appear in the mapping more than + * once, remove this limitation. + */ + def validate():Unit = { + // Convert sets of SSPs to lists, to preserve duplicates + val allSSPs: List[SystemStreamPartition] = m.values.toList.map(_.toList).flatten + val sspCountMap = allSSPs.groupBy(ssp => ssp) // Group all the SSPs together + .map(ssp => (ssp._1 -> ssp._2.size)) // Turn into map -> count of that SSP + .filter(ssp => ssp._2 != 1) // Filter out those that appear once + + if(!sspCountMap.isEmpty) { + throw new SamzaException("Assigning the same SystemStreamPartition to multiple TaskNames is not currently supported." + + " Out of compliance SystemStreamPartitions and counts: " + sspCountMap) + } + + debug("Successfully validated TaskName to SystemStreamPartition set mapping:" + m) + } + + /** + * Return a set of all the SystemStreamPartitions for all the keys. + * + * @return All SystemStreamPartitions within this map + */ + def getAllSSPs(): Iterable[SystemStreamPartition] = m.values.flatten + + /** + * Return a set of all the Systems presents in the SystemStreamPartitions across all the keys + * + * @return All Systems within this map + */ + def getAllSystems(): Set[String] = getAllSSPs.map(_.getSystemStream.getSystem).toSet + + /** + * Return a set of all the Partition IDs in the SystemStreamPartitions across all the keys + * + * @return All Partition IDs within this map + */ + def getAllPartitionIds(): Set[Int] = getAllSSPs.map(_.getPartition.getPartitionId).toSet + + /** + * Return a set of all the Streams in the SystemStreamPartitions across all the keys + * + * @return All Streams within this map + */ + def getAllStreams(): Set[String] = getAllSSPs.map(_.getSystemStream.getStream).toSet + + /** + * Return a set of all the SystemStreams in the SystemStreamPartitions across all the keys + * + * @return All SystemStreams within this map + */ + def getAllSystemStreams: Set[SystemStream] = getAllSSPs().map(_.getSystemStream).toSet + + // CommandBuilder needs to get a copy of this map and is a Java interface, therefore we can't just go straight + // from this type to JSON (for passing into the command option. + // Not super crazy about having the Java -> Scala and Scala -> Java methods in two different (but close) places: + // here and in the apply method on the companion object. May be better to just have a conversion util, but would + // be less clean. Life is cruel on the border of Scalapolis and Javatown. + def getJavaFriendlyType: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]] = { + import scala.collection.JavaConverters._ + + m.map({case(k,v) => k -> v.asJava}).toMap.asJava + } +} + +object TaskNamesToSystemStreamPartitions { + def apply() = new TaskNamesToSystemStreamPartitions() + + def apply(m: Map[TaskName, Set[SystemStreamPartition]]) = new TaskNamesToSystemStreamPartitions(m) + + /** + * Convert from Java-happy type we obtain from the SSPTaskName factory + * + * @param m Java version of a map of sets of strings + * @return Populated SSPTaskName map + */ + def apply(m: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]) = { + import scala.collection.JavaConversions._ + + val rightType: immutable.Map[TaskName, Set[SystemStreamPartition]] = m.map({case(k,v) => k -> v.toSet}).toMap + + new TaskNamesToSystemStreamPartitions(rightType) + } +}
