SAMZA-676: Implemented broadcast stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1f77f8b9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1f77f8b9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1f77f8b9 Branch: refs/heads/master Commit: 1f77f8b986e1e58982704b71ea9bb497ac0f70df Parents: 79ec5db Author: Yan Fang <[email protected]> Authored: Mon Aug 24 10:46:16 2015 -0400 Committer: Yan Fang <[email protected]> Committed: Mon Aug 24 10:46:16 2015 -0400 ---------------------------------------------------------------------- checkstyle/import-control.xml | 6 + .../versioned/container/samza-container.md | 10 + .../versioned/jobs/configuration-table.html | 16 ++ .../org/apache/samza/system/SystemAdmin.java | 10 + ...inglePartitionWithoutOffsetsSystemAdmin.java | 4 + .../org/apache/samza/config/TaskConfigJava.java | 84 ++++++++ .../grouper/stream/GroupByPartition.java | 83 ++++++++ .../grouper/stream/GroupByPartitionFactory.java | 29 +++ .../stream/GroupBySystemStreamPartition.java | 80 ++++++++ .../GroupBySystemStreamPartitionFactory.java | 31 +++ .../apache/samza/checkpoint/OffsetManager.scala | 201 ++++++++++++------- .../org/apache/samza/config/JobConfig.scala | 2 +- .../org/apache/samza/container/RunLoop.scala | 23 ++- .../apache/samza/container/SamzaContainer.scala | 5 +- .../apache/samza/container/TaskInstance.scala | 71 +++++-- .../samza/container/TaskInstanceMetrics.scala | 1 + .../grouper/stream/GroupByPartition.scala | 41 ---- .../stream/GroupBySystemStreamPartition.scala | 38 ---- .../filereader/FileReaderSystemAdmin.scala | 4 + .../apache/samza/config/TestTaskConfigJava.java | 61 ++++++ .../grouper/stream/TestGroupByPartition.java | 130 ++++++++++++ .../TestGroupBySystemStreamPartition.java | 104 ++++++++++ .../samza/checkpoint/TestOffsetManager.scala | 39 ++-- .../apache/samza/container/TestRunLoop.scala | 16 ++ .../samza/container/TestSamzaContainer.scala | 1 + .../samza/container/TestTaskInstance.scala | 92 ++++++++- .../grouper/stream/GroupByTestBase.scala | 58 ------ .../grouper/stream/TestGroupByPartition.scala | 39 ---- .../TestGroupBySystemStreamPartition.scala | 42 ---- .../samza/coordinator/TestJobCoordinator.scala | 2 + .../elasticsearch/ElasticsearchSystemAdmin.java | 5 + .../samza/system/hdfs/HdfsSystemAdmin.scala | 3 + .../samza-hdfs-test-batch-job-text.properties | 17 ++ .../samza-hdfs-test-batch-job.properties | 17 ++ .../samza-hdfs-test-job-text.properties | 17 ++ .../resources/samza-hdfs-test-job.properties | 16 ++ .../samza/system/kafka/KafkaSystemAdmin.scala | 34 ++-- .../system/kafka/KafkaSystemConsumer.scala | 9 +- .../samza/system/kafka/KafkaSystemFactory.scala | 1 + .../system/kafka/TestKafkaSystemConsumer.scala | 36 +++- .../samza/system/mock/MockSystemAdmin.java | 5 + .../yarn/TestSamzaAppMasterTaskManager.scala | 2 + 42 files changed, 1117 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index aaa235a..bc07ae8 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -33,6 +33,10 @@ <subpackage name="config"> <allow class="org.apache.samza.SamzaException" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.util" /> + + <allow class="org.apache.samza.Partition" /> </subpackage> <subpackage name="serializers"> @@ -113,6 +117,8 @@ <subpackage name="stream"> <allow pkg="org.apache.samza.container" /> <allow pkg="org.apache.samza.system" /> + + <allow class="org.apache.samza.Partition" /> </subpackage> <subpackage name="task"> http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/docs/learn/documentation/versioned/container/samza-container.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/samza-container.md b/docs/learn/documentation/versioned/container/samza-container.md index 9f46414..f97e8a3 100644 --- a/docs/learn/documentation/versioned/container/samza-container.md +++ b/docs/learn/documentation/versioned/container/samza-container.md @@ -102,4 +102,14 @@ Thus, if you want two events in different streams to be processed by the same ta There is one caveat in all of this: Samza currently assumes that a stream's partition count will never change. Partition splitting or repartitioning is not supported. If an input stream has N partitions, it is expected that it has always had, and will always have N partitions. If you want to re-partition a stream, you can write a job that reads messages from the stream, and writes them out to a new stream with the required number of partitions. For example, you could read messages from PageViewEvent, and write them to PageViewEventRepartition. +### Broadcast Streams + +After 0.10.0, Samza supports broadcast streams. You can assign partitions from some streams to all the tasks. For example, you want all the tasks can consume partition 0 and 1 from a stream called global-stream-1, and partition 2 from a stream called global-stream-2. You now can configure: + +{% highlight jproperties %} +task.broadcast.inputs=yourSystem.broadcast-stream-1#[0-1], yourSystem.broadcast-stream-2#2 +{% endhighlight %} + +If you use "[]", you are specifying a range. + ## [Streams »](streams.html) http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 8177fe5..78f2927 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -447,6 +447,22 @@ </tr> <tr> + <td class="property" id="task-global-inputs">task.global.inputs</td> + <td class="default"></td> + <td class="description"> + This property specifies the partitions that all tasks should consume. The systemStreamPartitions you put + here will be sent to all the tasks. + <dl> + <dt>Format: <span class="system">system-name</span>.<span class="stream">stream-name</span>#<i>partitionId</i> + or <span class="system">system-name</span>.<span class="stream">stream-name</span>#[<i>startingPartitionId</i>-<i>endingPartitionId</i>]</dt> + </dl> + <dl> + <dt>Example: <code>task.global.inputs=mySystem.globalStream#[1-2], mySystem.anotherGlobalStream#1</code></dt> + </dl> + </td> + </tr> + + <tr> <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th> </tr> http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index a920a10..bc926c5 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -79,4 +79,14 @@ public interface SystemAdmin { */ void createCoordinatorStream(String streamName); + /** + * Compare the two offsets. -1, 0, +1 means offset1 < offset2, + * offset1 == offset2 and offset1 > offset2 respectively. Return + * null if those two offsets are not comparable + * + * @param offset1 + * @param offset2 + * @return -1 if offset1 < offset2; 0 if offset1 == offset2; 1 if offset1 > offset2. Null if not comparable + */ + Integer offsetComparator(String offset1, String offset2); } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java index 63a1666..2157e69 100644 --- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java @@ -81,4 +81,8 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { throw new UnsupportedOperationException("Single partition admin can't create coordinator streams."); } + @Override + public Integer offsetComparator(String offset1, String offset2) { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java new file mode 100644 index 0000000..015e994 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskConfigJava extends MapConfig { + // broadcast streams consumed by all tasks. e.g. kafka.foo#1 + public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs"; + private static final String BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+"; + private static final String BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+"; + public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class); + + + public TaskConfigJava(Config config) { + super(config); + } + + /** + * Get the systemStreamPartitions of the broadcast stream. Specifying + * one partition for one stream or a range of the partitions for one + * stream is allowed. + * + * @return a Set of SystemStreamPartitions + */ + public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() { + HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<SystemStreamPartition>(); + List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS); + + for (String systemStreamPartition : systemStreamPartitions) { + if (Pattern.matches(BROADCAST_STREAM_PATTERN, systemStreamPartition)) { + + int hashPosition = systemStreamPartition.indexOf("#"); + SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, hashPosition)); + systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(systemStreamPartition.substring(hashPosition + 1))))); + + } else if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, systemStreamPartition)) { + + SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, systemStreamPartition.indexOf("#"))); + + int startingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.indexOf("[") + 1, systemStreamPartition.lastIndexOf("-"))); + int endingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.lastIndexOf("-") + 1, systemStreamPartition.indexOf("]"))); + + if (startingPartition > endingPartition) { + LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added"); + } + for (int i = startingPartition; i <= endingPartition; i++) { + systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i))); + } + } else { + throw new IllegalArgumentException("incorrect format in " + systemStreamPartition + + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'"); + } + } + return systemStreamPartitionSet; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java new file mode 100644 index 0000000..3022b72 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; + +public class GroupByPartition implements SystemStreamPartitionGrouper { + private TaskConfigJava taskConfig = null; + private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>(); + + /** + * default constructor + */ + public GroupByPartition() { + } + + /** + * Accepts the config in the constructor + * + * @param config job's config + */ + public GroupByPartition(Config config) { + if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) { + taskConfig = new TaskConfigJava(config); + this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions(); + } + } + + @Override + public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) { + Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>(); + + for (SystemStreamPartition ssp : ssps) { + // skip the broadcast streams if there is any + if (broadcastStreams.contains(ssp)) { + continue; + } + + TaskName taskName = new TaskName("Partition " + ssp.getPartition().getPartitionId()); + if (!groupedMap.containsKey(taskName)) { + groupedMap.put(taskName, new HashSet<SystemStreamPartition>()); + } + groupedMap.get(taskName).add(ssp); + } + + // assign the broadcast streams to all the taskNames + if (!broadcastStreams.isEmpty()) { + for (Set<SystemStreamPartition> value : groupedMap.values()) { + for (SystemStreamPartition ssp : broadcastStreams) { + value.add(ssp); + } + } + } + + return groupedMap; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java new file mode 100644 index 0000000..608508e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import org.apache.samza.config.Config; + +public class GroupByPartitionFactory implements SystemStreamPartitionGrouperFactory { + @Override + public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) { + return new GroupByPartition(config); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java new file mode 100644 index 0000000..a8b41de --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; + +public class GroupBySystemStreamPartition implements SystemStreamPartitionGrouper { + private TaskConfigJava taskConfig = null; + private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>(); + + /** + * default constructor + */ + public GroupBySystemStreamPartition() { + } + + /** + * A constructor that accepts job config as the parameter + * + * @param config job config + */ + public GroupBySystemStreamPartition(Config config) { + if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) { + taskConfig = new TaskConfigJava(config); + broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions(); + } + } + + @Override + public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) { + Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>(); + + for (SystemStreamPartition ssp : ssps) { + if (broadcastStreams.contains(ssp)) { + continue; + } + + HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>(); + sspSet.add(ssp); + groupedMap.put(new TaskName(ssp.toString()), sspSet); + } + + // assign the broadcast streams to all the taskNames + if (!broadcastStreams.isEmpty()) { + for (Set<SystemStreamPartition> value : groupedMap.values()) { + for (SystemStreamPartition ssp : broadcastStreams) { + value.add(ssp); + } + } + } + + return groupedMap; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java new file mode 100644 index 0000000..04a7444 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import org.apache.samza.config.Config; + +public class GroupBySystemStreamPartitionFactory implements SystemStreamPartitionGrouperFactory { + + @Override + public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) { + return new GroupBySystemStreamPartition(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 20e5d26..1464acc 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -72,8 +72,8 @@ object OffsetManager extends Logging { config: Config, checkpointManager: CheckpointManager = null, systemAdmins: Map[String, SystemAdmin] = Map(), - offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics, - latestOffsets: Map[SystemStreamPartition, String] = Map()) = { + offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics, + latestOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) = { debug("Building offset manager for %s." format systemStreamMetadata) val offsetSettings = systemStreamMetadata @@ -142,25 +142,28 @@ class OffsetManager( /** * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition. */ - val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics, + val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics, /* * The previously read checkpoints restored from the coordinator stream */ - val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map() - ) extends Logging { + val previousCheckpointedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) extends Logging { /** * Last offsets processed for each SystemStreamPartition. */ // Filter out null offset values, we can't use them, these exist only because of SSP information - var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null) + var lastProcessedOffsets = previousCheckpointedOffsets.map { + case (taskName, sspToOffset) => { + taskName -> sspToOffset.filter(_._2 != null) + } + } /** * Offsets to start reading from for each SystemStreamPartition. This * variable is populated after all checkpoints have been restored. */ - var startingOffsets = Map[SystemStreamPartition, String]() + var startingOffsets = Map[TaskName, Map[SystemStreamPartition, String]]() /** * The set of system stream partitions that have been registered with the @@ -172,7 +175,7 @@ class OffsetManager( def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) { systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister) // register metrics - systemStreamPartitions.foreach{ case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) } + systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach(ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) } } def start { @@ -190,23 +193,32 @@ class OffsetManager( /** * Set the last processed offset for a given SystemStreamPartition. */ - def update(systemStreamPartition: SystemStreamPartition, offset: String) { - lastProcessedOffsets += systemStreamPartition -> offset + def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) { + lastProcessedOffsets.get(taskName) match { + case Some(sspToOffsets) => lastProcessedOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> offset)) + case None => lastProcessedOffsets += (taskName -> Map(systemStreamPartition -> offset)) + } } /** * Get the last processed offset for a SystemStreamPartition. */ - def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = { - lastProcessedOffsets.get(systemStreamPartition) + def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = { + lastProcessedOffsets.get(taskName) match { + case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition) + case None => None + } } /** * Get the starting offset for a SystemStreamPartition. This is the offset * where a SamzaContainer begins reading from when it starts up. */ - def getStartingOffset(systemStreamPartition: SystemStreamPartition) = { - startingOffsets.get(systemStreamPartition) + def getStartingOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = { + startingOffsets.get(taskName) match { + case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition) + case None => None + } } /** @@ -217,10 +229,19 @@ class OffsetManager( debug("Checkpointing offsets for taskName %s." format taskName) val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet - val partitionOffsets = lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_)) + val partitionOffsets = lastProcessedOffsets.get(taskName) match { + case Some(sspToOffsets) => sspToOffsets.filterKeys(sspsForTaskName.contains(_)) + case None => { + warn(taskName + " is not found... ") + Map[SystemStreamPartition, String]() + } + } checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets)) - lastProcessedOffsets.foreach{ case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) } + lastProcessedOffsets.get(taskName) match { + case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) } + case None => + } } else { debug("Skipping checkpointing for taskName %s because no checkpoint manager is defined." format taskName) } @@ -262,7 +283,9 @@ class OffsetManager( */ private def loadOffsets { debug("Loading offsets") - lastProcessedOffsets.filter { + lastProcessedOffsets.map { + case (taskName, sspToOffsets) => { + taskName -> sspToOffsets.filter { case (systemStreamPartition, offset) => val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream) if (!shouldKeep) { @@ -271,7 +294,8 @@ class OffsetManager( info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition)) shouldKeep } - + } + } } /** @@ -279,27 +303,43 @@ class OffsetManager( * reset using resetOffsets. */ private def stripResetStreams { - val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys) - - systemStreamPartitionsToReset.foreach(systemStreamPartition => { - val offset = lastProcessedOffsets(systemStreamPartition) - info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition)) - }) + val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets) + + systemStreamPartitionsToReset.foreach { + case (taskName, systemStreamPartitions) => { + systemStreamPartitions.foreach { + systemStreamPartition => + { + val offset = lastProcessedOffsets(taskName).get(systemStreamPartition) + info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition)) + } + } + } + } - lastProcessedOffsets --= systemStreamPartitionsToReset + lastProcessedOffsets = lastProcessedOffsets.map { + case (taskName, sspToOffsets) => { + taskName -> (sspToOffsets -- systemStreamPartitionsToReset(taskName)) + } + } } /** - * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset + * Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset */ - private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = { - systemStreamPartitions - .filter(systemStreamPartition => { - val systemStream = systemStreamPartition.getSystemStream - offsetSettings - .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream)) - .resetOffset - }).toSet + private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: Map[TaskName, Map[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = { + taskNameTosystemStreamPartitions.map { + case (taskName, sspToOffsets) => { + taskName -> (sspToOffsets.filter { + case (systemStreamPartition, offset) => { + val systemStream = systemStreamPartition.getSystemStream + offsetSettings + .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream)) + .resetOffset + } + }.keys.toSet) + } + } } /** @@ -307,16 +347,18 @@ class OffsetManager( * SystemStreamPartition, and populate startingOffsets. */ private def loadStartingOffsets { - startingOffsets ++= lastProcessedOffsets - // Group offset map according to systemName. - .groupBy(_._1.getSystem) - // Get next offsets for each system. - .flatMap { - case (systemName, systemStreamPartitionOffsets) => - systemAdmins - .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName)) - .getOffsetsAfter(systemStreamPartitionOffsets) + startingOffsets = lastProcessedOffsets.map { + case (taskName, sspToOffsets) => { + taskName -> { + sspToOffsets.groupBy(_._1.getSystem).flatMap { + case (systemName, systemStreamPartitionOffsets) => + systemAdmins + .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName)) + .getOffsetsAfter(systemStreamPartitionOffsets) + } + } } + } } /** @@ -324,42 +366,47 @@ class OffsetManager( * that was registered, but has no offset. */ private def loadDefaults { - val allSSPs: Set[SystemStreamPartition] = systemStreamPartitions - .values - .flatten - .toSet - - allSSPs.foreach(systemStreamPartition => { - if (!startingOffsets.contains(systemStreamPartition)) { - val systemStream = systemStreamPartition.getSystemStream - val partition = systemStreamPartition.getPartition - val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream)) - val systemStreamMetadata = offsetSetting.metadata - val offsetType = offsetSetting.defaultOffset - - debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition)) - - val systemStreamPartitionMetadata = systemStreamMetadata - .getSystemStreamPartitionMetadata - .get(partition) - - if (systemStreamPartitionMetadata != null) { - val nextOffset = { - val requested = systemStreamPartitionMetadata.getOffset(offsetType) - - if (requested == null) { - warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition)) - systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING) - } else requested + val taskNameToSSPs: Map[TaskName, Set[SystemStreamPartition]] = systemStreamPartitions + + taskNameToSSPs.foreach { + case (taskName, systemStreamPartitions) => { + systemStreamPartitions.foreach { systemStreamPartition => + if (!startingOffsets.contains(taskName) || !startingOffsets(taskName).contains(systemStreamPartition)) { + val systemStream = systemStreamPartition.getSystemStream + val partition = systemStreamPartition.getPartition + val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream)) + val systemStreamMetadata = offsetSetting.metadata + val offsetType = offsetSetting.defaultOffset + + debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition)) + + val systemStreamPartitionMetadata = systemStreamMetadata + .getSystemStreamPartitionMetadata + .get(partition) + + if (systemStreamPartitionMetadata != null) { + val nextOffset = { + val requested = systemStreamPartitionMetadata.getOffset(offsetType) + + if (requested == null) { + warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition)) + systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING) + } else requested + } + + debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition)) + + startingOffsets.get(taskName) match { + case Some(sspToOffsets) => startingOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> nextOffset)) + case None => startingOffsets += taskName -> Map(systemStreamPartition -> nextOffset) + } + + } else { + throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata) + } } - - debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition)) - - startingOffsets += systemStreamPartition -> nextOffset - } else { - throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata) } } - }) + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index e4b14f4..6d73bb9 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -19,10 +19,10 @@ package org.apache.samza.config -import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.util.Logging +import org.apache.samza.container.grouper.stream.GroupByPartitionFactory object JobConfig { // job config constants http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 24da35f..6916c5c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -51,14 +51,17 @@ class RunLoop( // Messages come from the chooser with no connection to the TaskInstance they're bound for. // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them. - val systemStreamPartitionToTaskInstance: Map[SystemStreamPartition, TaskInstance] = { + val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping + + def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = { // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap - taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap + taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map { + case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList + } } - /** * Starts the run loop. Blocks until either the tasks request shutdown, or an * unhandled exception is thrown. @@ -111,11 +114,15 @@ class RunLoop( trace("Processing incoming message envelope for SSP %s." format ssp) metrics.envelopes.inc - val taskInstance = systemStreamPartitionToTaskInstance(ssp) - val coordinator = new ReadableCoordinator(taskInstance.taskName) - - taskInstance.process(envelope, coordinator) - checkCoordinator(coordinator) + val taskInstances = systemStreamPartitionToTaskInstances(ssp) + taskInstances.foreach { + taskInstance => + { + val coordinator = new ReadableCoordinator(taskInstance.taskName) + taskInstance.process(envelope, coordinator) + checkCoordinator(coordinator) + } + } } else { trace("No incoming message envelope was available.") metrics.nullEnvelopes.inc http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 85b012b..61e228b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -335,8 +335,8 @@ object SamzaContainer extends Logging { info("Got checkpoint manager: %s" format checkpointManager) - val combinedOffsets: Map[SystemStreamPartition, String] = - containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap + val combinedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = + containerModel.getTasks.map{case (taskName, taskModel) => taskName -> mapAsScalaMap(taskModel.getCheckpointedOffsets).toMap }.toMap val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets) @@ -508,6 +508,7 @@ object SamzaContainer extends Logging { taskName = taskName, config = config, metrics = taskInstanceMetrics, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, collector = collector, containerContext = containerContext, http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index c5a5ea5..d32a929 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -36,14 +36,15 @@ import org.apache.samza.task.StreamTask import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.Logging - import scala.collection.JavaConversions._ +import org.apache.samza.system.SystemAdmin class TaskInstance( task: StreamTask, val taskName: TaskName, config: Config, metrics: TaskInstanceMetrics, + systemAdmins: Map[String, SystemAdmin], consumerMultiplexer: SystemConsumers, collector: TaskInstanceCollector, containerContext: SamzaContainerContext, @@ -69,9 +70,15 @@ class TaskInstance( def getSamzaContainerContext = containerContext override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = { - offsetManager.startingOffsets += (ssp -> offset) + val startingOffsets = offsetManager.startingOffsets + offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset)) } } + // store the (ssp -> if this ssp is catched up) mapping. "catched up" + // means the same ssp in other taskInstances have the same offset as + // the one here. + var ssp2catchedupMapping: scala.collection.mutable.Map[SystemStreamPartition, Boolean] = scala.collection.mutable.Map[SystemStreamPartition, Boolean]() + systemStreamPartitions.foreach(ssp2catchedupMapping += _ -> false) def registerMetrics { debug("Registering metrics for taskName: %s" format taskName) @@ -115,13 +122,13 @@ class TaskInstance( debug("Registering consumers for taskName: %s" format taskName) systemStreamPartitions.foreach(systemStreamPartition => { - val offset = offsetManager.getStartingOffset(systemStreamPartition) - .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition)) + val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition) + .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition)) consumerMultiplexer.register(systemStreamPartition, offset) metrics.addOffsetGauge(systemStreamPartition, () => { offsetManager - .getLastProcessedOffset(systemStreamPartition) - .getOrElse(null) + .getLastProcessedOffset(taskName, systemStreamPartition) + .orNull }) }) } @@ -129,15 +136,24 @@ class TaskInstance( def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) { metrics.processes.inc - trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) - - exceptionHandler.maybeHandle { - task.process(envelope, collector, coordinator) + if (!ssp2catchedupMapping.getOrElse(envelope.getSystemStreamPartition, + throw new SamzaException(envelope.getSystemStreamPartition + " is not registered!"))) { + checkCaughtUp(envelope) } - trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) + if (ssp2catchedupMapping(envelope.getSystemStreamPartition)) { + metrics.messagesActuallyProcessed.inc + + trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) + + exceptionHandler.maybeHandle { + task.process(envelope, collector, coordinator) + } + + trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) - offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset) + offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset) + } } def window(coordinator: ReadableCoordinator) { @@ -193,4 +209,35 @@ class TaskInstance( override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s]" format (taskName, isWindowableTask, isClosableTask) + + /** + * From the envelope, check if this SSP has catched up with the starting offset of the SSP + * in this TaskInstance. If the offsets are not comparable, default to true, which means + * it's already catched-up. + */ + private def checkCaughtUp(envelope: IncomingMessageEnvelope) = { + systemAdmins match { + case null => { + warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up") + ssp2catchedupMapping(envelope.getSystemStreamPartition) = true + } + case others => { + val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition) + .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition)) + val system = envelope.getSystemStreamPartition.getSystem + others(system).offsetComparator(envelope.getOffset, startingOffset) match { + case null => { + info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up") + ssp2catchedupMapping(envelope.getSystemStreamPartition) = true // not comparable + } + case result => { + if (result >= 0) { + info(envelope.getSystemStreamPartition.toString + " is catched up.") + ssp2catchedupMapping(envelope.getSystemStreamPartition) = true + } + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index 9dc7051..8b86388 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -31,6 +31,7 @@ class TaskInstanceMetrics( val commits = newCounter("commit-calls") val windows = newCounter("window-calls") val processes = newCounter("process-calls") + val messagesActuallyProcessed = newCounter("messages-actually-processed") val sends = newCounter("send-calls") val flushes = newCounter("flush-calls") val messagesSent = newCounter("messages-sent") http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala deleted file mode 100644 index 44e95fc..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.grouper.stream - -import org.apache.samza.container.TaskName -import java.util -import org.apache.samza.system.SystemStreamPartition -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ -import org.apache.samza.config.Config - -/** - * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being - * the string representation of the Partition. - */ -class GroupByPartition extends SystemStreamPartitionGrouper { - override def group(ssps: util.Set[SystemStreamPartition]) = { - ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) ) - .map(r => r._1 -> r._2.asJava) - } -} - -class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory { - override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition -} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala deleted file mode 100644 index 3c0acad..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.grouper.stream - -import org.apache.samza.container.TaskName -import java.util -import org.apache.samza.system.SystemStreamPartition -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ -import org.apache.samza.config.Config - -/** - * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each - * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition - */ -class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper { - override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava) -} - -class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory { - override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition -} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala index c29853d..460d11c 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala @@ -142,4 +142,8 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging { def createCoordinatorStream(streamName: String) { throw new UnsupportedOperationException("Method not implemented.") } + + override def offsetComparator(offset1: String , offset2: String) = { + null + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java new file mode 100644 index 0000000..2d6060e --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +public class TestTaskConfigJava { + + @Test + public void testGetBroadcastSystemStreamPartitions() { + HashMap<String, String> map = new HashMap<String, String>(); + map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14]"); + Config config = new MapConfig(map); + + TaskConfigJava taskConfig = new TaskConfigJava(config); + Set<SystemStreamPartition> systemStreamPartitionSet = taskConfig.getBroadcastSystemStreamPartitions(); + + HashSet<SystemStreamPartition> expected = new HashSet<SystemStreamPartition>(); + expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4))); + expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5))); + expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12))); + expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13))); + expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14))); + assertEquals(expected, systemStreamPartitionSet); + + map.put("task.broadcast.inputs", "kafka.foo"); + taskConfig = new TaskConfigJava(new MapConfig(map)); + boolean catchCorrectException = false; + try { + taskConfig.getBroadcastSystemStreamPartitions(); + } catch (IllegalArgumentException e) { + catchCorrectException = true; + } + assertTrue(catchCorrectException); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java new file mode 100644 index 0000000..2a8d447 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import static org.junit.Assert.*; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +public class TestGroupByPartition { + SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)); + SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)); + SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)); + SystemStreamPartition ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1)); + SystemStreamPartition ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2)); + SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)); + + @Test + public void testLocalStreamsGroupedCorrectly() { + HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); + GroupByPartition grouper = new GroupByPartition(); + Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs); + // empty SSP set gets empty groups + assertTrue(emptyResult.isEmpty()); + + Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + + HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>(); + partition0.add(aa0); + partition0.add(ac0); + expectedResult.put(new TaskName("Partition 0"), partition0); + + HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>(); + partition1.add(aa1); + partition1.add(ab1); + expectedResult.put(new TaskName("Partition 1"), partition1); + + HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>(); + partition2.add(aa2); + partition2.add(ab2); + expectedResult.put(new TaskName("Partition 2"), partition2); + + assertEquals(expectedResult, result); + } + + @Test + public void testBroadcastStreamsGroupedCorrectly() { + HashMap<String, String> configMap = new HashMap<String, String>(); + configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1"); + Config config = new MapConfig(configMap); + GroupByPartition grouper = new GroupByPartition(config); + + HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); + Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + + HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>(); + partition0.add(aa0); // broadcast stream + partition0.add(ac0); + partition0.add(ab1); // broadcast stream + expectedResult.put(new TaskName("Partition 0"), partition0); + + HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>(); + partition1.add(aa1); + partition1.add(ab1); // broadcast stream + partition1.add(aa0); // broadcast stream + expectedResult.put(new TaskName("Partition 1"), partition1); + + HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>(); + partition2.add(aa2); + partition2.add(ab2); + partition2.add(aa0); // broadcast stream + partition2.add(ab1); // broadcast stream + expectedResult.put(new TaskName("Partition 2"), partition2); + + assertEquals(expectedResult, result); + } + + @Test + public void testNoTaskOnlyContainsBroadcastStreams() { + HashMap<String, String> configMap = new HashMap<String, String>(); + configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1"); + Config config = new MapConfig(configMap); + GroupByPartition grouper = new GroupByPartition(config); + + HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); + Collections.addAll(allSSPs, aa0, ab1, ab2); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>(); + partition2.add(aa0); // broadcast stream + partition2.add(ab1); + partition2.add(ab2); // broadcast stream + expectedResult.put(new TaskName("Partition 2"), partition2); + + assertEquals(expectedResult, result); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java new file mode 100644 index 0000000..1bd14a4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import static org.junit.Assert.*; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +public class TestGroupBySystemStreamPartition { + SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)); + SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)); + SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)); + SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)); + + @Test + public void testLocalStreamGroupedCorrectly() { + HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); + + GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition(); + Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs); + assertTrue(emptyResult.isEmpty()); + + Collections.addAll(allSSPs, aa0, aa1, aa2, ac0); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + + HashSet<SystemStreamPartition> partitionaa0 = new HashSet<SystemStreamPartition>(); + partitionaa0.add(aa0); + expectedResult.put(new TaskName(aa0.toString()), partitionaa0); + + HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>(); + partitionaa1.add(aa1); + expectedResult.put(new TaskName(aa1.toString()), partitionaa1); + + HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>(); + partitionaa2.add(aa2); + expectedResult.put(new TaskName(aa2.toString()), partitionaa2); + + HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>(); + partitionac0.add(ac0); + expectedResult.put(new TaskName(ac0.toString()), partitionac0); + + assertEquals(expectedResult, result); + } + + @Test + public void testBroadcastStreamGroupedCorrectly() { + HashMap<String, String> configMap = new HashMap<String, String>(); + configMap.put("task.broadcast.inputs", "SystemA.StreamA#0"); + Config config = new MapConfig(configMap); + + HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); + Collections.addAll(allSSPs, aa0, aa1, aa2, ac0); + GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition(config); + Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); + + Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>(); + + HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>(); + partitionaa1.add(aa1); + partitionaa1.add(aa0); + expectedResult.put(new TaskName(aa1.toString()), partitionaa1); + + HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>(); + partitionaa2.add(aa2); + partitionaa2.add(aa0); + expectedResult.put(new TaskName(aa2.toString()), partitionaa2); + + HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>(); + partitionac0.add(ac0); + partitionac0.add(aa0); + expectedResult.put(new TaskName(ac0.toString()), partitionac0); + + assertEquals(expectedResult, result); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index daa5eab..c00ef91 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -49,9 +49,9 @@ class TestOffsetManager { val offsetManager = OffsetManager(systemStreamMetadata, config) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start - assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined) - assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined) - assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get) + assertFalse(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).isDefined) + assertTrue(offsetManager.getStartingOffset(taskName, systemStreamPartition).isDefined) + assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get) } @Test @@ -73,14 +73,14 @@ class TestOffsetManager { assertEquals(taskName, checkpointManager.registered.head) assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(taskName)) // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset. - assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get) - assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get) - offsetManager.update(systemStreamPartition, "46") - assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get) - offsetManager.update(systemStreamPartition, "47") - assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get) + assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get) + assertEquals("45", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get) + offsetManager.update(taskName, systemStreamPartition, "46") + assertEquals("46", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get) + offsetManager.update(taskName, systemStreamPartition, "47") + assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get) // Should never update starting offset. - assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get) + assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get) offsetManager.checkpoint(taskName) val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47")) assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName)) @@ -103,11 +103,11 @@ class TestOffsetManager { // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset. offsetManager.checkpoint(taskName) assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue) - offsetManager.update(systemStreamPartition, "46") - offsetManager.update(systemStreamPartition, "47") + offsetManager.update(taskName, systemStreamPartition, "46") + offsetManager.update(taskName, systemStreamPartition, "47") offsetManager.checkpoint(taskName) assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue) - offsetManager.update(systemStreamPartition, "48") + offsetManager.update(taskName, systemStreamPartition, "48") offsetManager.checkpoint(taskName) assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue) } @@ -133,7 +133,7 @@ class TestOffsetManager { assertEquals(taskName, checkpointManager.registered.head) assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName)) // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset. - assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get) + assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get) } @Test @@ -223,7 +223,7 @@ class TestOffsetManager { offsetManager.start assertTrue(checkpointManager.isStarted) assertEquals(1, checkpointManager.registered.size) - assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null)) + assertNull(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition1).getOrElse(null)) } @Test @@ -236,7 +236,7 @@ class TestOffsetManager { val offsetManager = new OffsetManager(offsetSettings = Map(ssp.getSystemStream -> settings)) offsetManager.register(taskName, Set(ssp)) offsetManager.start - assertEquals(Some("13"), offsetManager.getStartingOffset(ssp)) + assertEquals(Some("13"), offsetManager.getStartingOffset(taskName, ssp)) } @@ -255,10 +255,7 @@ class TestOffsetManager { override def stop { isStopped = true } // Only for testing purposes - not present in actual checkpoint manager - def getOffets: util.Map[SystemStreamPartition, String] = - { - checkpoint.getOffsets() - } + def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap) } } @@ -281,6 +278,8 @@ class TestOffsetManager { override def createCoordinatorStream(streamName: String) { new UnsupportedOperationException("Method not implemented.") } + + override def offsetComparator(offset1: String, offset2: String) = null } } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index b9d9e73..b4d6f35 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -20,6 +20,7 @@ package org.apache.samza.container import org.junit.Test +import org.junit.Assert._ import org.mockito.Matchers import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -243,4 +244,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat testMetrics.commits.getCount should equal(1L) testMetrics.windows.getCount should equal(1L) } + + @Test + def testGetSystemStreamPartitionToTaskInstancesMapping { + val ti0 = mock[TaskInstance] + val ti1 = mock[TaskInstance] + val ti2 = mock[TaskInstance] + when(ti0.systemStreamPartitions).thenReturn(Set(ssp0)) + when(ti1.systemStreamPartitions).thenReturn(Set(ssp1)) + when(ti2.systemStreamPartitions).thenReturn(Set(ssp1)) + + val mockTaskInstances = Map(taskName0 -> ti0, taskName1 -> ti1, new TaskName("2") -> ti2) + val runLoop = new RunLoop(mockTaskInstances, null, new SamzaContainerMetrics) + val expected = Map(ssp0 -> List(ti0), ssp1 -> List(ti1, ti2)) + assertEquals(expected, runLoop.getSystemStreamPartitionToTaskInstancesMapping) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 4db6d5c..6de8710 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -152,6 +152,7 @@ class TestSamzaContainer extends AssertionsForJUnit { taskName, config, new TaskInstanceMetrics, + null, consumerMultiplexer, collector, containerContext http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 7caad28..5457f0e 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -20,7 +20,6 @@ package org.apache.samza.container import java.util.concurrent.ConcurrentHashMap - import org.apache.samza.SamzaException import org.apache.samza.Partition import org.apache.samza.checkpoint.OffsetManager @@ -44,8 +43,9 @@ import org.apache.samza.task._ import org.junit.Assert._ import org.junit.Test import org.scalatest.Assertions.intercept - import scala.collection.JavaConversions._ +import org.apache.samza.system.SystemAdmin +import scala.collection.mutable.ListBuffer class TestTaskInstance { @Test @@ -64,6 +64,7 @@ class TestTaskInstance { new SerdeManager) val systemStream = new SystemStream("test-system", "test-stream") val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + val systemStreamPartitions = Set(systemStreamPartition) // Pretend our last checkpointed (next) offset was 2. val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) @@ -75,15 +76,17 @@ class TestTaskInstance { taskName, config, new TaskInstanceMetrics, + null, consumerMultiplexer, collector, containerContext, - offsetManager) + offsetManager, + systemStreamPartitions = systemStreamPartitions) // Pretend we got a message with offset 2 and next offset 3. val coordinator = new ReadableCoordinator(taskName) taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator) // Check to see if the offset manager has been properly updated with offset 3. - val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition) + val lastProcessedOffset = offsetManager.getLastProcessedOffset(taskName, systemStreamPartition) assertTrue(lastProcessedOffset.isDefined) assertEquals("2", lastProcessedOffset.get) } @@ -156,6 +159,7 @@ class TestTaskInstance { new SerdeManager) val systemStream = new SystemStream("test-system", "test-stream") val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + val systemStreamPartitions = Set(systemStreamPartition) // Pretend our last checkpointed (next) offset was 2. val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) @@ -170,10 +174,12 @@ class TestTaskInstance { taskName, config, taskMetrics, + null, consumerMultiplexer, collector, containerContext, offsetManager, + systemStreamPartitions = systemStreamPartitions, exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config)) val coordinator = new ReadableCoordinator(taskName) @@ -210,6 +216,7 @@ class TestTaskInstance { new SerdeManager) val systemStream = new SystemStream("test-system", "test-stream") val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + val systemStreamPartitions = Set(systemStreamPartition) // Pretend our last checkpointed (next) offset was 2. val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) @@ -224,10 +231,12 @@ class TestTaskInstance { taskName, config, taskMetrics, + null, consumerMultiplexer, collector, containerContext, offsetManager, + systemStreamPartitions = systemStreamPartitions, exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config)) val coordinator = new ReadableCoordinator(taskName) @@ -242,7 +251,6 @@ class TestTaskInstance { assertEquals(1L, getCount(group, classOf[FatalException].getName)) } - /** * Tests that the init() method of task can override the existing offset * assignment. @@ -258,8 +266,7 @@ class TestTaskInstance { override def init(config: Config, context: TaskContext): Unit = { assertTrue("Can only update offsets for assigned partition", - context.getSystemStreamPartitions.contains(partition1) - ) + context.getSystemStreamPartitions.contains(partition1)) context.setStartingOffset(partition1, "10") } @@ -278,22 +285,85 @@ class TestTaskInstance { val offsetManager = new OffsetManager() - offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0") + offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "0") val taskInstance = new TaskInstance( task, taskName, config, metrics, + null, consumers, collector, containerContext, offsetManager, - systemStreamPartitions = Set(partition0, partition1) ) + systemStreamPartitions = Set(partition0, partition1)) taskInstance.initTask - assertEquals(Some("0"), offsetManager.getStartingOffset(partition0)) - assertEquals(Some("10"), offsetManager.getStartingOffset(partition1)) + assertEquals(Some("0"), offsetManager.getStartingOffset(taskName, partition0)) + assertEquals(Some("10"), offsetManager.getStartingOffset(taskName, partition1)) + } + + @Test + def testIgnoreMessagesOlderThanStartingOffsets { + val partition0 = new SystemStreamPartition("system", "stream", new Partition(0)) + val partition1 = new SystemStreamPartition("system", "stream", new Partition(1)) + val config = new MapConfig() + val chooser = new RoundRobinChooser() + val consumers = new SystemConsumers(chooser, consumers = Map.empty) + val producers = new SystemProducers(Map.empty, new SerdeManager()) + val metrics = new TaskInstanceMetrics() + val taskName = new TaskName("testing") + val collector = new TaskInstanceCollector(producers) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) + val offsetManager = new OffsetManager() + offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100") + val systemAdmins = Map("system" -> new MockSystemAdmin) + var result = new ListBuffer[IncomingMessageEnvelope] + + val task = new StreamTask { + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + result += envelope + } + } + + val taskInstance = new TaskInstance( + task, + taskName, + config, + metrics, + systemAdmins, + consumers, + collector, + containerContext, + offsetManager, + systemStreamPartitions = Set(partition0, partition1)) + + val coordinator = new ReadableCoordinator(taskName) + val envelope1 = new IncomingMessageEnvelope(partition0, "1", null, null) + val envelope2 = new IncomingMessageEnvelope(partition0, "2", null, null) + val envelope3 = new IncomingMessageEnvelope(partition1, "1", null, null) + val envelope4 = new IncomingMessageEnvelope(partition1, "102", null, null) + + taskInstance.process(envelope1, coordinator) + taskInstance.process(envelope2, coordinator) + taskInstance.process(envelope3, coordinator) + taskInstance.process(envelope4, coordinator) + + val expected = List(envelope1, envelope2, envelope4) + assertEquals(expected, result.toList) + } +} + +class MockSystemAdmin extends SystemAdmin { + override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets } + override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null + override def createCoordinatorStream(stream: String) = {} + override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {} + override def validateChangelogStream(topicName: String, numOfPartitions: Int) = {} + + override def offsetComparator(offset1: String, offset2: String) = { + offset1.toLong compare offset2.toLong } }
