Repository: samza Updated Branches: refs/heads/master 79ec5dbfc -> 1f77f8b98
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala deleted file mode 100644 index a14169b..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.grouper.stream - -import java.util.Collections -import java.util.HashSet -import java.util.Map -import java.util.Set - -import org.apache.samza.Partition -import org.apache.samza.container.TaskName -import org.apache.samza.system.SystemStreamPartition -import org.junit.Test -import org.junit.Assert._ - -object GroupByTestBase { - val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)) - val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)) - val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)) - val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1)) - val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2)) - val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)) - val allSSPs = new HashSet[SystemStreamPartition] - Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0) -} - -abstract class GroupByTestBase { - def getGrouper: SystemStreamPartitionGrouper - - @Test - def emptySetReturnsEmptyMap { - val grouper: SystemStreamPartitionGrouper = getGrouper - val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition]) - assertTrue(result.isEmpty) - } - - def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) { - val grouper: SystemStreamPartitionGrouper = getGrouper - val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input) - assertEquals(output, result) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala deleted file mode 100644 index 74daf72..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.grouper.stream - -import org.apache.samza.container.TaskName -import org.junit.Test - -import scala.collection.JavaConverters._ - -class TestGroupByPartition extends GroupByTestBase { - import GroupByTestBase._ - - // from base class provided set - val expected = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava, - new TaskName("Partition 1") -> Set(aa1, ab1).asJava, - new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava - - override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition - - @Test def groupingWorks() { - verifyGroupGroupsCorrectly(allSSPs, expected) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala deleted file mode 100644 index deb3895..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container.grouper.stream - -import org.apache.samza.container.TaskName -import org.junit.Test -import scala.collection.JavaConverters._ - -class TestGroupBySystemStreamPartition extends GroupByTestBase { - import GroupByTestBase._ - - // Building manually to avoid just duplicating a logic potential logic error here and there - // From base class provided set - val expected = Map(new TaskName(aa0.toString) -> Set(aa0).asJava, - new TaskName(aa1.toString) -> Set(aa1).asJava, - new TaskName(aa2.toString) -> Set(aa2).asJava, - new TaskName(ab1.toString) -> Set(ab1).asJava, - new TaskName(ab2.toString) -> Set(ab2).asJava, - new TaskName(ac0.toString) -> Set(ac0).asJava).asJava - - override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition - - @Test def groupingWorks() { - verifyGroupGroupsCorrectly(allSSPs, expected) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index d47de7d..1393da8 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -274,4 +274,6 @@ class MockSystemAdmin extends SystemAdmin { override def createCoordinatorStream(streamName: String) { new UnsupportedOperationException("Method not implemented.") } + + override def offsetComparator(offset1: String, offset2: String) = null } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java index 1f5751e..446534a 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java @@ -67,4 +67,9 @@ public class ElasticsearchSystemAdmin implements SystemAdmin { public void validateChangelogStream(String streamName, int numOfPartitions) { throw new UnsupportedOperationException(); } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala index c18e90d..92eb447 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala @@ -46,4 +46,7 @@ class HdfsSystemAdmin extends SystemAdmin with Logging { throw new UnsupportedOperationException("Method not implemented.") } + def offsetComparator(offset1: String, offset2: String) = { + throw new UnsupportedOperationException("Method not implemented.") + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties index b590e29..9767107 100644 --- a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties +++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties @@ -1,2 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + systems.samza-hdfs-test-batch-job-text.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.bytes=512 http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties index ab90548..cefffc9 100644 --- a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties +++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties @@ -1 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + systems.samza-hdfs-test-batch-job.producer.hdfs.write.batch.size.bytes=512 http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties index 9df1397..ca1977b 100644 --- a/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties +++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties @@ -1 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + systems.samza-hdfs-test-job-text.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties index e69de29..13a8339 100644 --- a/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties +++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 4391e24..aee33a9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -25,11 +25,11 @@ import org.apache.samza.SamzaException import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging} +import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging } import kafka.api._ import kafka.consumer.SimpleConsumer -import kafka.common.{TopicExistsException, TopicAndPartition} -import java.util.{Properties, UUID} +import kafka.common.{ TopicExistsException, TopicAndPartition } +import java.util.{ Properties, UUID } import scala.collection.JavaConversions._ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import kafka.consumer.ConsumerConfig @@ -134,8 +134,7 @@ class KafkaSystemAdmin( * Replication factor for the Changelog topic in kafka * Kafka properties to be used during the Changelog topic creation */ - topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]() - ) extends SystemAdmin with Logging { + topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends SystemAdmin with Logging { import KafkaSystemAdmin._ @@ -322,7 +321,7 @@ class KafkaSystemAdmin( private def createTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy info("Attempting to create change log topic %s." format topicName) - info("Using partition count "+ numKafkaChangelogPartitions + " for creating change log topic") + info("Using partition count " + numKafkaChangelogPartitions + " for creating change log topic") val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) retryBackoff.run( loop => { @@ -348,11 +347,10 @@ class KafkaSystemAdmin( info("Changelog topic %s already exists." format topicName) loop.done case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format(topicName, e)) + warn("Failed to create topic %s: %s. Retrying." format (topicName, e)) debug("Exception detail:", e) } - } - ) + }) } private def validateTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { @@ -367,7 +365,7 @@ class KafkaSystemAdmin( val partitionCount = topicMetadata.partitionsMetadata.length if (partitionCount < numKafkaChangelogPartitions) { - throw new KafkaChangelogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format(topicName, topicMetadata.partitionsMetadata.length, numKafkaChangelogPartitions)) + throw new KafkaChangelogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, numKafkaChangelogPartitions)) } info("Successfully validated changelog topic %s." format topicName) @@ -378,11 +376,10 @@ class KafkaSystemAdmin( exception match { case e: KafkaChangelogException => throw e case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format(topicName, e)) + warn("While trying to validate topic %s: %s. Retrying." format (topicName, e)) debug("Exception detail:", e) } - } - ) + }) } /** @@ -405,4 +402,15 @@ class KafkaSystemAdmin( override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { validateTopicInKafka(topicName, numKafkaChangelogPartitions) } + + /** + * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; + * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. + * + * Currently it's used in the context of the broadcast streams to detect + * the mismatch between two streams when consuming the broadcast streams. + */ + override def offsetComparator(offset1: String, offset2: String) = { + offset1.toLong compare offset2.toLong + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index de00320..c948d64 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -40,6 +40,7 @@ import kafka.api.TopicMetadata import org.apache.samza.util.ExponentialSleepStrategy import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConversions._ +import org.apache.samza.system.SystemAdmin object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -55,6 +56,7 @@ object KafkaSystemConsumer { */ private[kafka] class KafkaSystemConsumer( systemName: String, + systemAdmin: SystemAdmin, metrics: KafkaSystemConsumerMetrics, metadataStore: TopicMetadataStore, clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, @@ -102,7 +104,12 @@ private[kafka] class KafkaSystemConsumer( override def register(systemStreamPartition: SystemStreamPartition, offset: String) { super.register(systemStreamPartition, offset) - topicPartitionsAndOffsets += KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) -> offset + val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) + val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset) + // register the older offset in the consumer + if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) { + topicPartitionsAndOffsets.replace(topicAndPartition, offset) + } metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)) } http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 1629035..d84bf06 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -64,6 +64,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { new KafkaSystemConsumer( systemName = systemName, + systemAdmin = getAdmin(systemName, config), metrics = metrics, metadataStore = metadataStore, clientId = clientId, http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala index 2a84328..23fa939 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -23,18 +23,22 @@ import kafka.api.TopicMetadata import kafka.api.PartitionMetadata import kafka.cluster.Broker import kafka.common.TopicAndPartition - import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition import org.apache.samza.util.TopicMetadataStore import org.junit.Test import org.junit.Assert._ +import org.apache.samza.system.SystemAdmin +import org.mockito.Mockito._ +import org.mockito.Matchers._ class TestKafkaSystemConsumer { + val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin]) + @Test def testFetchThresholdShouldDivideEvenlyAmongPartitions { val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) { + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) { override def refreshBrokers { } } @@ -53,13 +57,13 @@ class TestKafkaSystemConsumer { val systemName = "test-system" val streamName = "test-stream" val metrics = new KafkaSystemConsumerMetrics - // Lie and tell the store that the partition metadata is empty. We can't - // use partition metadata because it has Broker in its constructor, which + // Lie and tell the store that the partition metadata is empty. We can't + // use partition metadata because it has Broker in its constructor, which // is package private to Kafka. val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0))) var hosts = List[String]() var getHostPortCount = 0 - val consumer = new KafkaSystemConsumer(systemName, metrics, metadataStore) { + val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore) { override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = { // Generate a unique host every time getHostPort is called. getHostPortCount += 1 @@ -89,8 +93,30 @@ class TestKafkaSystemConsumer { consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2) assertEquals(List("localhost-1", "localhost-2"), hosts) } + + @Test + def testConsumerRegisterOlderOffsetOfTheSamzaSSP { + when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod() + + val metadataStore = new MockMetadataStore + val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) + val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) + val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) + val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2)) + + consumer.register(ssp0, "0") + consumer.register(ssp0, "5") + consumer.register(ssp1, "2") + consumer.register(ssp1, "3") + consumer.register(ssp2, "0") + + assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0))) + assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1))) + assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2))) + } } class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore { def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata } + http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java index f23b8f9..a05f89a 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java @@ -79,4 +79,9 @@ public class MockSystemAdmin implements SystemAdmin { public void createCoordinatorStream(String streamName) { throw new UnsupportedOperationException("Method not implemented."); } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index 6f67cf5..2eec65f 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -447,4 +447,6 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin { override def createCoordinatorStream(streamName: String) { new UnsupportedOperationException("Method not implemented.") } + + override def offsetComparator(offset1: String, offset2: String) = null }
