> On June 18, 2015, 10:37 p.m., Navina Ramesh wrote: > > Mostly looks good. Have some questions: > > * Have you tried the message "filtering" logic to the container level > > instead of the task level ? Not sure which is simpler in terms of code > > change. Since the container has access to all the task Instances and the > > systemAdmins, it seems convenient to have the caughtUp map within > > containerContext. I could be wrong :) > > * I want to test the patch locally before confirming a ship it. Looks > > awesome for a first draft!
The Container only initializes the task instances. At the beginning, the container knows all the information about the tasks and systems. However, after the RunLoop is called, the containerContext will not be updated. So it's a little difficult to inform the task intances when the offsets are caught up. > On June 18, 2015, 10:37 p.m., Navina Ramesh wrote: > > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, > > line 412 > > <https://reviews.apache.org/r/34974/diff/4/?file=986051#file986051line412> > > > > The exception message is inaccurate. It can also happen when the > > taskName is not in startingOffsets map (although I am not sure if such a > > case will happen). If the taskName is not in the startingOffsets map, this exception will not be thrown. (It is inside the loop of line 374) > On June 18, 2015, 10:37 p.m., Navina Ramesh wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 144 > > <https://reviews.apache.org/r/34974/diff/4/?file=986055#file986055line144> > > > > Should we a have different metric for number of messages received by > > process() than the number of messages actually processed? > > We need to clarify the semantics of all our metrics, in perhaps a > > separate RB yes, fixed. > On June 18, 2015, 10:37 p.m., Navina Ramesh wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 132 > > <https://reviews.apache.org/r/34974/diff/4/?file=986055#file986055line132> > > > > instead of getOrElse(null), try .orNull fixed > On June 18, 2015, 10:37 p.m., Navina Ramesh wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala, > > line 109 > > <https://reviews.apache.org/r/34974/diff/4/?file=986071#file986071line109> > > > > We are registering with the offset in the method invocation in Line > > 105. Why do we need to update the topicPartitionsAndOffsets map with the > > replaced offset ? > > > > I understand that all tasks within the same container may be at > > different offset for broadcast stream ssps. But looks like > > consumer.register is being invoked in multiple places - TaskStorageManager > > & CoordinatorStreamSystemConsumer . Will the change impact these other > > components ? Why do we need to update the topicPartitionsAndOffsets map with the replaced offset ? -- because linke 105 only registers the offsets in BlockingEnvelopeMap, which is for bufferring. It has nothing to do with the starting offsets that consumers will consume. Will the change impact these other components ? -- No. If the stream is changelog, all tasks have their own changelog (partitions). So one partitions is assigned to more than one tasks. The replacement will not happen. The code is the same as before-this-patch. If the stream is coordinatorStream, we will always consume from the beginning, the starting offset is always 0. - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/#review88308 ----------------------------------------------------------- On June 22, 2015, 6:07 a.m., Yan Fang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34974/ > ----------------------------------------------------------- > > (Updated June 22, 2015, 6:07 a.m.) > > > Review request for samza. > > > Bugs: SAMZA-676 > https://issues.apache.org/jira/browse/SAMZA-676 > > > Repository: samza > > > Description > ------- > > 1. added offsetComparator method in SystemAdmin Interface > > 2. added "task.global.inputs" config > > 3. rewrote Grouper classes using Java; allows to assign global streams during > grouping > > 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer > to preserve messages order > > 5. added taskNames to the offsets in OffsetManager > > 6. allowed to assign one SSP to multiple taskInstances > > 7. skipped already-processed messages in RunLoop > > 8. unit tests for all changes > > > Diffs > ----- > > checkstyle/import-control.xml 3374f0c > docs/learn/documentation/versioned/container/samza-container.md 9f46414 > docs/learn/documentation/versioned/jobs/configuration-table.html 405e2ce > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 249b8ae > samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 20e5d26 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > cbacd18 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > c5a5ea5 > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 9dc7051 > > samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala > 44e95fc > > samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala > 3c0acad > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > 097f410 > samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 8d54c46 > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > 64a5844 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 9fb1aa9 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 7caad28 > > samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala > a14169b > > samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala > 74daf72 > > samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala > deb3895 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > d9ae187 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 35086f5 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > de00320 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 1629035 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala > 2a84328 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > b063366 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 1e936b4 > > Diff: https://reviews.apache.org/r/34974/diff/ > > > Testing > ------- > > > Thanks, > > Yan Fang > >