On Aug. 19, 2014, 7:34 a.m., Chris Riccomini wrote:
> > It looks good for me, except two not-used imports and one typo. :)
> > 
> > Overall, two thoughts:
> > 1. "collector" and "producers" seem convertible in the patch. Such as in 
> > TaskInstance.registerProducers method, we actually do "collector.register"; 
> > In TaskInstance.commit, use "Flushing producers for taskName:" as the log 
> > for collector.flush. So what is the real difference between "collectors" 
> > and "producers"?
> > 2. From my persepective, producerMultiplexer seems doing all the jobs 
> > (correct me if I miss some points). So why do we use TaskInstanceCollector 
> > here?
> 
> Chris Riccomini wrote:
>     The reason that we need something like TaskInstanceCollector is because 
> SystemProducers segments everything according to a "source" (a task name). 
> This is so that if one TaskInstance (StreamTask) calls 
> TaskCoordinator.commit, it only commits for *that* task, not all tasks in the 
> container. Before we had this change, it was not intuitive, since calling 
> things like commit/flush would actually commit and flush all output for all 
> tasks in the container. This led to a lot of latency, and having to 
> coordinate between StreamTasks to figure out when it was safe to commit.
>     
>     We have SystemProducers.send(source, envelope) for this reason. This way, 
> the producers can buffer and flush outgoing messages grouped by source, and 
> thus flush only messages for a given source. This means that we now have an 
> API mis-match between MessageCollector.send(envelope), whic has the source 
> implicit (based on the StreamTask doing the sending), and the 
> SystemProducers.send(source, envelope), which has the source explicitly 
> defined. To get around this, you can either (1) wrap the SystemProducers.send 
> call with a proper source defined on a per-TaskInstance basis, or (2) provide 
> some callback mechanism, and use SystemProducers.register to "listen" to 
> collectors that are registered by source.
>     
>     I opted for approach (1), since (2) would cause us to have to register 
> the SystemProducers prematurely in the SamzaContainer.main method (since 
> StorageEngineFactory.getStorageEngine also requires the same collector as 
> used for the TaskInstance).
>     
>     I'm wondering if part of this is just naming? The TaskInstanceCollector 
> is really more of a wrapper around the SystemProducers. It just so happens 
> that it also implements the MessageCollector, and is used in that way in the 
> task.process, and task.window calls?

Thanks for the explanation. Using TaskInstanceCollector as a wrapper for 
SystemProducers to overcome the API mis-match makes sense to me. (yes, the name 
was a little confusing. I was thinking the "collector" should have more things 
to do than just a wrapper. ) Since this is not end-user-faced stuff, am not 
picky about that as long as new developers can understand it. Besides this, I 
am ok with the patch. Seems Chinmay opened an issue.


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24820/#review50959
-----------------------------------------------------------


On Aug. 19, 2014, 5:46 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24820/
> -----------------------------------------------------------
> 
> (Updated Aug. 19, 2014, 5:46 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-384
>     https://issues.apache.org/jira/browse/SAMZA-384
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> fix useless imports and wrong javadoc words
> 
> 
> add javadocs. remove readable container. make all tests pass.
> 
> 
> add task instance collector that sends immediately.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3a264ad4bd9ea6ffa801c662dae09ebd7ff79d32 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> d574ac413c0ec81e12eb44b2d0cc0d9843aad434 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  44d5dffb36edd03032bbbd8c13541f18192f2ba2 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 9484ddb50a97eef52ab6ff7c932fdda0ff1ecb0a 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  aae3f8795ef9a12beaefa0917939107102e76b31 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala 
> 444bf37db259d4fccc8ca2d479096c109911d46c 
>   samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> 86b7f31d1a574f4ee66e8498ecb501a08b94dff5 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> e3c7fe3e2d329b0767eb439144b1ba419848bb96 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 9d5ff1309cd7ae6d41b48870ef7445d3710a2196 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  22c8f0c2954d070aab9ce8d204aa8220d9c7bd74 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
>  ad39157c6d052b2e14e51b2f8a61d740fc18a129 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  7d0b8db0c3bf1e70fd6af03abb594c7000e6666b 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  851aae6b347b0be2cd2d891fc45030c3e47189d4 
> 
> Diff: https://reviews.apache.org/r/24820/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to