> On March 25, 2016, 10:26 p.m., Navina Ramesh wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
> > line 41
> > <https://reviews.apache.org/r/45144/diff/7/?file=1314982#file1314982line41>
> >
> >     Why should the TaskAssignmentManager be a part of the LocalityManager? 
> > It doesn't seem to be doing much other than providing an accessor 
> > getTaskAssignmentManager to GroupByContainerCount. An extension of 
> > AbstractCoordinaotStreamManager typically performs only one function. This 
> > is deviation that I think is not necessary. 
> >     
> >     The TaskAssignmentManager could be instantiated in JobCoordinator and 
> > passed to the "balance" call. Do we really need to couple them together?

Explained above in response to Jagadish's review:
"
1. At one end of the spectrum, they could be completely independent, but that 
would complicate a number of method signatures in the JobCoordinator
2. At the other end of the spectrum, there could be one LocalityManager that is 
composed of a TaskLocalityManager and a ContainerLocalityManager, each of which 
handle the coordinator stream interactions. This looks best structurally, but 
since the current LocalityManager is used in SamzaContainer and the 
TaskAssignmentManager will not, it's questionable whether this structure fits 
the usage pattern.
3. I chose the middle, where the TaskAssignmentManager is a field of the 
LocalityManger, which loosely associates them.

The latter 2 options both simplify the JobCoordinator, and after the upcoming 
diff, allow us to pass one manager into the balance() method which contains 
both the task and container mappings. This will be useful for more intelligent 
implementations of the balance() method, which might try to reassign tasks from 
containers that were previously running on the same host to a new container 
also on that host. 
"


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review125481
-----------------------------------------------------------


On March 25, 2016, 5:30 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 25, 2016, 5:30 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan 
> (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to 
> minimize the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which 
> can be implemented pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() 
> functionality added. This is because the balance logic is specific to a 
> grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some 
> for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the 
> JobCoordinator code cleaner, but also associates the 2 managers for Host 
> Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements 
> BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a 
> container. Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for 
> balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the 
> new functionality without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container 
> mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with 
> taskname groupers, old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container 
> mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> acf93525ea5c97df187bbe7977e2ae9fea65b32b 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  211b64215f26db49cd4411ff3fb41231145307d7 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  4d093b500b7f3b582446634ced3e9d0b28371616 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
>  cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
>  8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 06a96ad6ed786c22924017f894413bfa1ea34c06 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added 
> the first time the job is run. It is then used as expected to reduce task 
> reassignment as the container count was adjusted from 4->3->5 on subsequent 
> runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to