Repository: samza
Updated Branches:
  refs/heads/master 79ec5dbfc -> 1f77f8b98


http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
deleted file mode 100644
index a14169b..0000000
--- 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import java.util.Collections
-import java.util.HashSet
-import java.util.Map
-import java.util.Set
-
-import org.apache.samza.Partition
-import org.apache.samza.container.TaskName
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Test
-import org.junit.Assert._
-
-object GroupByTestBase {
-  val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
-  val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1))
-  val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2))
-  val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1))
-  val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2))
-  val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0))
-  val allSSPs = new HashSet[SystemStreamPartition]
-  Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0)
-}
-
-abstract class GroupByTestBase {
-  def getGrouper: SystemStreamPartitionGrouper
-
-  @Test
-  def emptySetReturnsEmptyMap {
-    val grouper: SystemStreamPartitionGrouper = getGrouper
-    val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new 
HashSet[SystemStreamPartition])
-    assertTrue(result.isEmpty)
-  }
-
-  def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: 
Map[TaskName, Set[SystemStreamPartition]]) {
-    val grouper: SystemStreamPartitionGrouper = getGrouper
-    val result: Map[TaskName, Set[SystemStreamPartition]] = 
grouper.group(input)
-    assertEquals(output, result)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
deleted file mode 100644
index 74daf72..0000000
--- 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import org.apache.samza.container.TaskName
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-
-class TestGroupByPartition extends GroupByTestBase {
-  import GroupByTestBase._
-
-  // from base class provided set
-  val expected = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
-                     new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
-                     new TaskName("Partition 2") -> Set(aa2, 
ab2).asJava).asJava
-
-  override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
-
-  @Test def groupingWorks() {
-    verifyGroupGroupsCorrectly(allSSPs, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
deleted file mode 100644
index deb3895..0000000
--- 
a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import org.apache.samza.container.TaskName
-import org.junit.Test
-import scala.collection.JavaConverters._
-
-class TestGroupBySystemStreamPartition extends GroupByTestBase {
-  import GroupByTestBase._
-
-  // Building manually to avoid just duplicating a logic potential logic error 
here and there
-  // From base class provided set
-  val expected = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
-                     new TaskName(aa1.toString) -> Set(aa1).asJava,
-                     new TaskName(aa2.toString) -> Set(aa2).asJava,
-                     new TaskName(ab1.toString) -> Set(ab1).asJava,
-                     new TaskName(ab2.toString) -> Set(ab2).asJava,
-                     new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
-
-  override def getGrouper: SystemStreamPartitionGrouper = new 
GroupBySystemStreamPartition
-
-  @Test def groupingWorks() {
-    verifyGroupGroupsCorrectly(allSSPs, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index d47de7d..1393da8 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -274,4 +274,6 @@ class MockSystemAdmin extends SystemAdmin {
   override def createCoordinatorStream(streamName: String) {
     new UnsupportedOperationException("Method not implemented.")
   }
+  
+  override def offsetComparator(offset1: String, offset2: String) = null
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
 
b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
index 1f5751e..446534a 100644
--- 
a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
+++ 
b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
@@ -67,4 +67,9 @@ public class ElasticsearchSystemAdmin implements SystemAdmin {
   public void validateChangelogStream(String streamName, int numOfPartitions) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+         throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
index c18e90d..92eb447 100644
--- 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
+++ 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
@@ -46,4 +46,7 @@ class HdfsSystemAdmin extends SystemAdmin with Logging {
     throw new UnsupportedOperationException("Method not implemented.")
   }
 
+  def offsetComparator(offset1: String, offset2: String) = {
+    throw new UnsupportedOperationException("Method not implemented.")
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties 
b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties
index b590e29..9767107 100644
--- a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties
@@ -1,2 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 
systems.samza-hdfs-test-batch-job-text.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
 systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.bytes=512

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties 
b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties
index ab90548..cefffc9 100644
--- a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties
@@ -1 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 systems.samza-hdfs-test-batch-job.producer.hdfs.write.batch.size.bytes=512

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties 
b/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties
index 9df1397..ca1977b 100644
--- a/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties
@@ -1 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 
systems.samza-hdfs-test-job-text.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties 
b/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties
index e69de29..13a8339 100644
--- a/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 4391e24..aee33a9 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -25,11 +25,11 @@ import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, Logging}
+import org.apache.samza.util.{ ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, Logging }
 import kafka.api._
 import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicExistsException, TopicAndPartition}
-import java.util.{Properties, UUID}
+import kafka.common.{ TopicExistsException, TopicAndPartition }
+import java.util.{ Properties, UUID }
 import scala.collection.JavaConversions._
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import kafka.consumer.ConsumerConfig
@@ -134,8 +134,7 @@ class KafkaSystemAdmin(
    * Replication factor for the Changelog topic in kafka
    * Kafka properties to be used during the Changelog topic creation
    */
-  topicMetaInformation: Map[String, ChangelogInfo] =  Map[String, 
ChangelogInfo]()
-  ) extends SystemAdmin with Logging {
+  topicMetaInformation: Map[String, ChangelogInfo] = Map[String, 
ChangelogInfo]()) extends SystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
@@ -322,7 +321,7 @@ class KafkaSystemAdmin(
   private def createTopicInKafka(topicName: String, 
numKafkaChangelogPartitions: Int) {
     val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
     info("Attempting to create change log topic %s." format topicName)
-    info("Using partition count "+ numKafkaChangelogPartitions + " for 
creating change log topic")
+    info("Using partition count " + numKafkaChangelogPartitions + " for 
creating change log topic")
     val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new 
KafkaChangelogException("Unable to find topic information for topic " + 
topicName))
     retryBackoff.run(
       loop => {
@@ -348,11 +347,10 @@ class KafkaSystemAdmin(
             info("Changelog topic %s already exists." format topicName)
             loop.done
           case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format(topicName, 
e))
+            warn("Failed to create topic %s: %s. Retrying." format (topicName, 
e))
             debug("Exception detail:", e)
         }
-      }
-    )
+      })
   }
 
   private def validateTopicInKafka(topicName: String, 
numKafkaChangelogPartitions: Int) {
@@ -367,7 +365,7 @@ class KafkaSystemAdmin(
 
         val partitionCount = topicMetadata.partitionsMetadata.length
         if (partitionCount < numKafkaChangelogPartitions) {
-          throw new KafkaChangelogException("Changelog topic validation failed 
for topic %s because partition count %s did not match expected partition count 
of %d" format(topicName, topicMetadata.partitionsMetadata.length, 
numKafkaChangelogPartitions))
+          throw new KafkaChangelogException("Changelog topic validation failed 
for topic %s because partition count %s did not match expected partition count 
of %d" format (topicName, topicMetadata.partitionsMetadata.length, 
numKafkaChangelogPartitions))
         }
 
         info("Successfully validated changelog topic %s." format topicName)
@@ -378,11 +376,10 @@ class KafkaSystemAdmin(
         exception match {
           case e: KafkaChangelogException => throw e
           case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." 
format(topicName, e))
+            warn("While trying to validate topic %s: %s. Retrying." format 
(topicName, e))
             debug("Exception detail:", e)
         }
-      }
-    )
+      })
   }
 
   /**
@@ -405,4 +402,15 @@ class KafkaSystemAdmin(
   override def validateChangelogStream(topicName: String, 
numKafkaChangelogPartitions: Int) = {
     validateTopicInKafka(topicName, numKafkaChangelogPartitions)
   }
+
+  /**
+   * Compare the two offsets. Returns x where x < 0 if offset1 < offset2;
+   * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2.
+   *
+   * Currently it's used in the context of the broadcast streams to detect
+   * the mismatch between two streams when consuming the broadcast streams.
+   */
+  override def offsetComparator(offset1: String, offset2: String) = {
+    offset1.toLong compare offset2.toLong
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index de00320..c948d64 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -40,6 +40,7 @@ import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
 import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -55,6 +56,7 @@ object KafkaSystemConsumer {
  */
 private[kafka] class KafkaSystemConsumer(
   systemName: String,
+  systemAdmin: SystemAdmin,
   metrics: KafkaSystemConsumerMetrics,
   metadataStore: TopicMetadataStore,
   clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
@@ -102,7 +104,12 @@ private[kafka] class KafkaSystemConsumer(
   override def register(systemStreamPartition: SystemStreamPartition, offset: 
String) {
     super.register(systemStreamPartition, offset)
 
-    topicPartitionsAndOffsets += 
KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) -> offset
+    val topicAndPartition = 
KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
+    val existingOffset = 
topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset)
+    // register the older offset in the consumer
+    if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) {
+      topicPartitionsAndOffsets.replace(topicAndPartition, offset)
+    }
 
     
metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 1629035..d84bf06 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -64,6 +64,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
 
     new KafkaSystemConsumer(
       systemName = systemName,
+      systemAdmin = getAdmin(systemName, config),
       metrics = metrics,
       metadataStore = metadataStore,
       clientId = clientId,

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 2a84328..23fa939 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -23,18 +23,22 @@ import kafka.api.TopicMetadata
 import kafka.api.PartitionMetadata
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
-
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.util.TopicMetadataStore
 import org.junit.Test
 import org.junit.Assert._
+import org.apache.samza.system.SystemAdmin
+import org.mockito.Mockito._
+import org.mockito.Matchers._
 
 class TestKafkaSystemConsumer {
+  val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
+
   @Test
   def testFetchThresholdShouldDivideEvenlyAmongPartitions {
     val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", new KafkaSystemConsumerMetrics, 
metadataStore, fetchThreshold = 50000) {
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
       override def refreshBrokers {
       }
     }
@@ -53,13 +57,13 @@ class TestKafkaSystemConsumer {
     val systemName = "test-system"
     val streamName = "test-stream"
     val metrics = new KafkaSystemConsumerMetrics
-    // Lie and tell the store that the partition metadata is empty. We can't 
-    // use partition metadata because it has Broker in its constructor, which 
+    // Lie and tell the store that the partition metadata is empty. We can't
+    // use partition metadata because it has Broker in its constructor, which
     // is package private to Kafka. 
     val metadataStore = new MockMetadataStore(Map(streamName -> 
TopicMetadata(streamName, Seq.empty, 0)))
     var hosts = List[String]()
     var getHostPortCount = 0
-    val consumer = new KafkaSystemConsumer(systemName, metrics, metadataStore) 
{
+    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, 
metadataStore) {
       override def getHostPort(topicMetadata: TopicMetadata, partition: Int): 
Option[(String, Int)] = {
         // Generate a unique host every time getHostPort is called.
         getHostPortCount += 1
@@ -89,8 +93,30 @@ class TestKafkaSystemConsumer {
     consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
     assertEquals(List("localhost-1", "localhost-2"), hosts)
   }
+
+  @Test
+  def testConsumerRegisterOlderOffsetOfTheSamzaSSP {
+    when(systemAdmin.offsetComparator(anyString, 
anyString)).thenCallRealMethod()
+
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new 
KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000)
+    val ssp0 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(0))
+    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(1))
+    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(2))
+
+    consumer.register(ssp0, "0")
+    consumer.register(ssp0, "5")
+    consumer.register(ssp1, "2")
+    consumer.register(ssp1, "3")
+    consumer.register(ssp2, "0")
+
+    assertEquals("0", 
consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0)))
+    assertEquals("2", 
consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
+    assertEquals("0", 
consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
+  }
 }
 
 class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) 
extends TopicMetadataStore {
   def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
 }
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index f23b8f9..a05f89a 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -79,4 +79,9 @@ public class MockSystemAdmin implements SystemAdmin {
   public void createCoordinatorStream(String streamName) {
     throw new UnsupportedOperationException("Method not implemented.");
   }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 6f67cf5..2eec65f 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -447,4 +447,6 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
   override def createCoordinatorStream(streamName: String) {
     new UnsupportedOperationException("Method not implemented.")
   }
+
+  override def offsetComparator(offset1: String, offset2: String) = null
 }

Reply via email to