> On March 25, 2016, 10:26 p.m., Navina Ramesh wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
> > line 41
> > <https://reviews.apache.org/r/45144/diff/7/?file=1314982#file1314982line41>
> >
> >     Why should the TaskAssignmentManager be a part of the LocalityManager? 
> > It doesn't seem to be doing much other than providing an accessor 
> > getTaskAssignmentManager to GroupByContainerCount. An extension of 
> > AbstractCoordinaotStreamManager typically performs only one function. This 
> > is deviation that I think is not necessary. 
> >     
> >     The TaskAssignmentManager could be instantiated in JobCoordinator and 
> > passed to the "balance" call. Do we really need to couple them together?
> 
> Jake Maes wrote:
>     Explained above in response to Jagadish's review:
>     "
>     1. At one end of the spectrum, they could be completely independent, but 
> that would complicate a number of method signatures in the JobCoordinator
>     2. At the other end of the spectrum, there could be one LocalityManager 
> that is composed of a TaskLocalityManager and a ContainerLocalityManager, 
> each of which handle the coordinator stream interactions. This looks best 
> structurally, but since the current LocalityManager is used in SamzaContainer 
> and the TaskAssignmentManager will not, it's questionable whether this 
> structure fits the usage pattern.
>     3. I chose the middle, where the TaskAssignmentManager is a field of the 
> LocalityManger, which loosely associates them.
>     
>     The latter 2 options both simplify the JobCoordinator, and after the 
> upcoming diff, allow us to pass one manager into the balance() method which 
> contains both the task and container mappings. This will be useful for more 
> intelligent implementations of the balance() method, which might try to 
> reassign tasks from containers that were previously running on the same host 
> to a new container also on that host. 
>     "
> 
> Navina Ramesh wrote:
>     Oops.. I didn't see your previous responses. My comments was mostly 
> questioning whether such an explicit association is required. 
>     
>     On point 1, I don't think it is a major change in the JobCoordinator 
> interfaces. They are all mostly private methods and are going to be 
> refactored as a part of Jagadish's work. 
>     
>     Unlike TaskAssignmentManager, LocalityManager is meant to be accessible 
> by both Samza container and job coordinator. This is not the case with 
> TaskAssignmentManager as it is fully controlled (read & write) only by the 
> Job Coordinator. So, I don't see this affecting the SamzaContainer in anyway. 
> If at all, I feel that the SamzaContainer now has a loop-hole to access the 
> TaskAssignmentManager :)
>     
>     In my view, "balance" seems like a specific implementation of 
> TaskNameGrouper that could have been called BalancedGroupByContainerCount. 
> Granted, this will cause inconvenience to existing users of 
> GroupByContainerCount to change their config.
> 
> Jake Maes wrote:
>     I wasn't concerned about the JC interfaces as much as code 
> readability/maintenance. The LocalityManager is in the signature of many of 
> the methods in JC. TaskAssignmentManager would need to be in almost all of 
> the same signatures. 
>     
>     Agreed on the usage pattern, though I don't assume the availability of 
> the TaskAssignmentManager in the containers is "necessarily" risky. 
> Regardless, it would be even cleaner if the coordinator is ONLY written 
> centrally by the AM/leader. That's why I'll be filing a JIRA to move the 
> LocalityManager writes to the AM/leader. Details to follow...
>     
>     Balance is an extension of group() but should follow the same strategy. 
> GroupByContainerCount.group() assigns tasks to the containers in round-robin 
> fashion. Balance does the same, but minimizes movement from the previous 
> assignments. A different implementation of 
> TaskNameGrouper/BalancingTaskNameGrouper would employ a different strategy 
> for both.
>     
>     Also, the balance() method necessarily has to have a different signature 
> than group() because there's no way (or would break some abstractions) to 
> pass the locality/mapping info in from the Factory.
> 
> Navina Ramesh wrote:
>     >> though I don't assume the availability of the TaskAssignmentManager in 
> the containers is "necessarily" risky. 
>     It does pose a potential risk to future implementations. For example, 
> opening up the read & write interface of any CoordinatorStreamManager is 
> actually a poor design choice (historically). This needs to be changed. 
>     
>     >> Regardless, it would be even cleaner if the coordinator is ONLY 
> written centrally by the AM/leader. That's why I'll be filing a JIRA to move 
> the LocalityManager writes to the AM/leader.
>     We can discuss this offline. Short explanation: Write access was 
> intentionally provided to Samza containers in LocalityManager because we want 
> to write the locality of the running container to the coordinator stream ONLY 
> after it successfully gets into the RunLoop. Since the AM/JC/leader does not 
> have any feedback indicating it is in the run loop, it should remain in the 
> container. 
>     
>     >> A different implementation of TaskNameGrouper/BalancingTaskNameGrouper 
> would employ a different strategy for both.
>     I agree. 
>     
>     >> the balance() method necessarily has to have a different signature 
> than group() 
>     Sure. The workaround is to use a factory overload as well, though it is 
> an overkill. I am only suggesting that you can instantiate the 
> TaskAssignmentManager in the JC and pass it down to the grouper. That way, it 
> is only a couple of interface changes to the private method and there is no 
> risk in the container.

Messed up the markdown! Reposting:
*though I don't assume the availability of the TaskAssignmentManager in the 
containers is "necessarily" risky.*
> It does pose a potential risk to future implementations. For example, opening 
> up the read & write interface of any CoordinatorStreamManager is actually a 
> poor design choice (historically). This needs to be changed.

*Regardless, it would be even cleaner if the coordinator is ONLY written 
centrally by the AM/leader. That's why I'll be filing a JIRA to move the 
LocalityManager writes to the AM/leader.*
> We can discuss this offline. Short explanation: Write access was 
> intentionally provided to Samza containers in LocalityManager because we want 
> to write the locality of the running container to the coordinator stream ONLY 
> after it successfully gets into the RunLoop. Since the AM/JC/leader does not 
> have any feedback indicating it is in the run loop, it should remain in the 
> container.

*A different implementation of TaskNameGrouper/BalancingTaskNameGrouper would 
employ a different strategy for both*
> I agree.

*the balance() method necessarily has to have a different signature than 
group()*
> Sure. The workaround is to use a factory overload as well, though it is an 
> overkill. I am only suggesting that you can instantiate the 
> TaskAssignmentManager in the JC and pass it down to the grouper. That way, it 
> is only a couple of interface changes to the private method and there is no 
> risk in the container.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review125481
-----------------------------------------------------------


On March 25, 2016, 5:30 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 25, 2016, 5:30 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan 
> (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to 
> minimize the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which 
> can be implemented pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() 
> functionality added. This is because the balance logic is specific to a 
> grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some 
> for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the 
> JobCoordinator code cleaner, but also associates the 2 managers for Host 
> Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements 
> BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a 
> container. Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for 
> balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the 
> new functionality without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container 
> mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with 
> taskname groupers, old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container 
> mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> acf93525ea5c97df187bbe7977e2ae9fea65b32b 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  211b64215f26db49cd4411ff3fb41231145307d7 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  4d093b500b7f3b582446634ced3e9d0b28371616 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
>  cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
>  8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 06a96ad6ed786c22924017f894413bfa1ea34c06 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added 
> the first time the job is run. It is then used as expected to reduce task 
> reassignment as the container count was adjusted from 4->3->5 on subsequent 
> runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to