> On March 25, 2016, 10:26 p.m., Navina Ramesh wrote: > > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, > > line 41 > > <https://reviews.apache.org/r/45144/diff/7/?file=1314982#file1314982line41> > > > > Why should the TaskAssignmentManager be a part of the LocalityManager? > > It doesn't seem to be doing much other than providing an accessor > > getTaskAssignmentManager to GroupByContainerCount. An extension of > > AbstractCoordinaotStreamManager typically performs only one function. This > > is deviation that I think is not necessary. > > > > The TaskAssignmentManager could be instantiated in JobCoordinator and > > passed to the "balance" call. Do we really need to couple them together? > > Jake Maes wrote: > Explained above in response to Jagadish's review: > " > 1. At one end of the spectrum, they could be completely independent, but > that would complicate a number of method signatures in the JobCoordinator > 2. At the other end of the spectrum, there could be one LocalityManager > that is composed of a TaskLocalityManager and a ContainerLocalityManager, > each of which handle the coordinator stream interactions. This looks best > structurally, but since the current LocalityManager is used in SamzaContainer > and the TaskAssignmentManager will not, it's questionable whether this > structure fits the usage pattern. > 3. I chose the middle, where the TaskAssignmentManager is a field of the > LocalityManger, which loosely associates them. > > The latter 2 options both simplify the JobCoordinator, and after the > upcoming diff, allow us to pass one manager into the balance() method which > contains both the task and container mappings. This will be useful for more > intelligent implementations of the balance() method, which might try to > reassign tasks from containers that were previously running on the same host > to a new container also on that host. > " > > Navina Ramesh wrote: > Oops.. I didn't see your previous responses. My comments was mostly > questioning whether such an explicit association is required. > > On point 1, I don't think it is a major change in the JobCoordinator > interfaces. They are all mostly private methods and are going to be > refactored as a part of Jagadish's work. > > Unlike TaskAssignmentManager, LocalityManager is meant to be accessible > by both Samza container and job coordinator. This is not the case with > TaskAssignmentManager as it is fully controlled (read & write) only by the > Job Coordinator. So, I don't see this affecting the SamzaContainer in anyway. > If at all, I feel that the SamzaContainer now has a loop-hole to access the > TaskAssignmentManager :) > > In my view, "balance" seems like a specific implementation of > TaskNameGrouper that could have been called BalancedGroupByContainerCount. > Granted, this will cause inconvenience to existing users of > GroupByContainerCount to change their config. > > Jake Maes wrote: > I wasn't concerned about the JC interfaces as much as code > readability/maintenance. The LocalityManager is in the signature of many of > the methods in JC. TaskAssignmentManager would need to be in almost all of > the same signatures. > > Agreed on the usage pattern, though I don't assume the availability of > the TaskAssignmentManager in the containers is "necessarily" risky. > Regardless, it would be even cleaner if the coordinator is ONLY written > centrally by the AM/leader. That's why I'll be filing a JIRA to move the > LocalityManager writes to the AM/leader. Details to follow... > > Balance is an extension of group() but should follow the same strategy. > GroupByContainerCount.group() assigns tasks to the containers in round-robin > fashion. Balance does the same, but minimizes movement from the previous > assignments. A different implementation of > TaskNameGrouper/BalancingTaskNameGrouper would employ a different strategy > for both. > > Also, the balance() method necessarily has to have a different signature > than group() because there's no way (or would break some abstractions) to > pass the locality/mapping info in from the Factory. > > Navina Ramesh wrote: > >> though I don't assume the availability of the TaskAssignmentManager in > the containers is "necessarily" risky. > It does pose a potential risk to future implementations. For example, > opening up the read & write interface of any CoordinatorStreamManager is > actually a poor design choice (historically). This needs to be changed. > > >> Regardless, it would be even cleaner if the coordinator is ONLY > written centrally by the AM/leader. That's why I'll be filing a JIRA to move > the LocalityManager writes to the AM/leader. > We can discuss this offline. Short explanation: Write access was > intentionally provided to Samza containers in LocalityManager because we want > to write the locality of the running container to the coordinator stream ONLY > after it successfully gets into the RunLoop. Since the AM/JC/leader does not > have any feedback indicating it is in the run loop, it should remain in the > container. > > >> A different implementation of TaskNameGrouper/BalancingTaskNameGrouper > would employ a different strategy for both. > I agree. > > >> the balance() method necessarily has to have a different signature > than group() > Sure. The workaround is to use a factory overload as well, though it is > an overkill. I am only suggesting that you can instantiate the > TaskAssignmentManager in the JC and pass it down to the grouper. That way, it > is only a couple of interface changes to the private method and there is no > risk in the container. > > Navina Ramesh wrote: > Messed up the markdown! Reposting: > *though I don't assume the availability of the TaskAssignmentManager in > the containers is "necessarily" risky.* > > It does pose a potential risk to future implementations. For example, > opening up the read & write interface of any CoordinatorStreamManager is > actually a poor design choice (historically). This needs to be changed. > > *Regardless, it would be even cleaner if the coordinator is ONLY written > centrally by the AM/leader. That's why I'll be filing a JIRA to move the > LocalityManager writes to the AM/leader.* > > We can discuss this offline. Short explanation: Write access was > intentionally provided to Samza containers in LocalityManager because we want > to write the locality of the running container to the coordinator stream ONLY > after it successfully gets into the RunLoop. Since the AM/JC/leader does not > have any feedback indicating it is in the run loop, it should remain in the > container. > > *A different implementation of TaskNameGrouper/BalancingTaskNameGrouper > would employ a different strategy for both* > > I agree. > > *the balance() method necessarily has to have a different signature than > group()* > > Sure. The workaround is to use a factory overload as well, though it is > an overkill. I am only suggesting that you can instantiate the > TaskAssignmentManager in the JC and pass it down to the grouper. That way, it > is only a couple of interface changes to the private method and there is no > risk in the container. > > Jake Maes wrote: > Can you provide examples of the historical problems caused by opening > read & write to the coordinator stream. Without them, I disagree that it's > poor design choice, especially if one agrees that the coordinator stream > should only be written centrally. > > The original intent for writing the locality from the container was not > overlooked. That's why the proposal to move it to the AM/leader will depend > on SAMZA-871
> Sure. The workaround is to use a factory overload as well, though it is an > overkill. I am only suggesting > that you can instantiate the TaskAssignmentManager in the JC and pass it down > to the grouper. That way, > it is only a couple of interface changes to the private method and there is > no risk in the container. I could do that, but it would muddle up the signatures of a bunch of JC methods with unnecessary args, especially since balance() would need to take both the LocalityManager AND the TaskAssignmentManager as arguments (see explanation in my original reply). Both pieces of locality info could be needed for some implementations of balance. Since they're both locality, I think it's reasonable to have both in the class called LocalityManager and simplify a bunch of method/interface signatures. If there's significant enough risk of the TaskAssignmentManager being used in the containers, I think the compromise is passing it into the LocalityManager constructor from the JC, but not in the container. In any case, we are always one code change away from someone using TaskAssignmentManager in the containers, no matter how easy or hard it is. That's why we have code reviews! :-) - Jake ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/45144/#review125481 ----------------------------------------------------------- On March 25, 2016, 5:30 p.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/45144/ > ----------------------------------------------------------- > > (Updated March 25, 2016, 5:30 p.m.) > > > Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan > (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Persist the task-to-container mapping in the coordinator stream and use it to > minimize the reassignment when the container count changes. > > A new BalancingTaskNameGrouper interface exposes the balance() method, which > can be implemented pluggably. > GroupByContainerCount has been rewritten in java and the balance() > functionality added. This is because the balance logic is specific to a > grouper. > > Detailed changes: > import-control.xml - Update imports that weren't needed in scala and add some > for tests. > LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the > JobCoordinator code cleaner, but also associates the 2 managers for Host > Affinity info. > GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements > BalancingTaskNameGrouper > GroupByContainerCountFactory - Rewritten in Java > TestStorageRecovery - Old test depended on the order of the partitions in a > container. Now it doesn't. > TestGroupByContainerCount - Rewritten in Java and lots of tests added for > balance() > BalancingTaskNameGrouper - New interface for the balance method. Exposes the > new functionality without breaking backward compatibility > TaskAssignmentManager - Coordinator stream manager for task-to-container > mapping > TaskNameGroupBalancer - Bridges the task mapping (balance) capability with > taskname groupers, old and new > SetTaskContainerMapping - Coordinator strem message for the task-to-container > mapping > > > Diffs > ----- > > checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java > acf93525ea5c97df187bbe7977e2ae9fea65b32b > > samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java > 211b64215f26db49cd4411ff3fb41231145307d7 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java > 4d093b500b7f3b582446634ced3e9d0b28371616 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala > cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala > 8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 06a96ad6ed786c22924017f894413bfa1ea34c06 > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > 429573b480112c7491303dc410d78f37a308c4a7 > samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java > 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 6e9c6fa579a5901000bea0601c771783d8334f0e > > Diff: https://reviews.apache.org/r/45144/diff/ > > > Testing > ------- > > A bunch of new unit tests have been added. > > Also tested with a test job. The task mapping (initially missing) is added > the first time the job is run. It is then used as expected to reduce task > reassignment as the container count was adjusted from 4->3->5 on subsequent > runs. > > > Thanks, > > Jake Maes > >