> On March 25, 2016, 10:26 p.m., Navina Ramesh wrote: > > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, > > line 41 > > <https://reviews.apache.org/r/45144/diff/7/?file=1314982#file1314982line41> > > > > Why should the TaskAssignmentManager be a part of the LocalityManager? > > It doesn't seem to be doing much other than providing an accessor > > getTaskAssignmentManager to GroupByContainerCount. An extension of > > AbstractCoordinaotStreamManager typically performs only one function. This > > is deviation that I think is not necessary. > > > > The TaskAssignmentManager could be instantiated in JobCoordinator and > > passed to the "balance" call. Do we really need to couple them together? > > Jake Maes wrote: > Explained above in response to Jagadish's review: > " > 1. At one end of the spectrum, they could be completely independent, but > that would complicate a number of method signatures in the JobCoordinator > 2. At the other end of the spectrum, there could be one LocalityManager > that is composed of a TaskLocalityManager and a ContainerLocalityManager, > each of which handle the coordinator stream interactions. This looks best > structurally, but since the current LocalityManager is used in SamzaContainer > and the TaskAssignmentManager will not, it's questionable whether this > structure fits the usage pattern. > 3. I chose the middle, where the TaskAssignmentManager is a field of the > LocalityManger, which loosely associates them. > > The latter 2 options both simplify the JobCoordinator, and after the > upcoming diff, allow us to pass one manager into the balance() method which > contains both the task and container mappings. This will be useful for more > intelligent implementations of the balance() method, which might try to > reassign tasks from containers that were previously running on the same host > to a new container also on that host. > " > > Navina Ramesh wrote: > Oops.. I didn't see your previous responses. My comments was mostly > questioning whether such an explicit association is required. > > On point 1, I don't think it is a major change in the JobCoordinator > interfaces. They are all mostly private methods and are going to be > refactored as a part of Jagadish's work. > > Unlike TaskAssignmentManager, LocalityManager is meant to be accessible > by both Samza container and job coordinator. This is not the case with > TaskAssignmentManager as it is fully controlled (read & write) only by the > Job Coordinator. So, I don't see this affecting the SamzaContainer in anyway. > If at all, I feel that the SamzaContainer now has a loop-hole to access the > TaskAssignmentManager :) > > In my view, "balance" seems like a specific implementation of > TaskNameGrouper that could have been called BalancedGroupByContainerCount. > Granted, this will cause inconvenience to existing users of > GroupByContainerCount to change their config. > > Jake Maes wrote: > I wasn't concerned about the JC interfaces as much as code > readability/maintenance. The LocalityManager is in the signature of many of > the methods in JC. TaskAssignmentManager would need to be in almost all of > the same signatures. > > Agreed on the usage pattern, though I don't assume the availability of > the TaskAssignmentManager in the containers is "necessarily" risky. > Regardless, it would be even cleaner if the coordinator is ONLY written > centrally by the AM/leader. That's why I'll be filing a JIRA to move the > LocalityManager writes to the AM/leader. Details to follow... > > Balance is an extension of group() but should follow the same strategy. > GroupByContainerCount.group() assigns tasks to the containers in round-robin > fashion. Balance does the same, but minimizes movement from the previous > assignments. A different implementation of > TaskNameGrouper/BalancingTaskNameGrouper would employ a different strategy > for both. > > Also, the balance() method necessarily has to have a different signature > than group() because there's no way (or would break some abstractions) to > pass the locality/mapping info in from the Factory. > > Navina Ramesh wrote: > >> though I don't assume the availability of the TaskAssignmentManager in > the containers is "necessarily" risky. > It does pose a potential risk to future implementations. For example, > opening up the read & write interface of any CoordinatorStreamManager is > actually a poor design choice (historically). This needs to be changed. > > >> Regardless, it would be even cleaner if the coordinator is ONLY > written centrally by the AM/leader. That's why I'll be filing a JIRA to move > the LocalityManager writes to the AM/leader. > We can discuss this offline. Short explanation: Write access was > intentionally provided to Samza containers in LocalityManager because we want > to write the locality of the running container to the coordinator stream ONLY > after it successfully gets into the RunLoop. Since the AM/JC/leader does not > have any feedback indicating it is in the run loop, it should remain in the > container. > > >> A different implementation of TaskNameGrouper/BalancingTaskNameGrouper > would employ a different strategy for both. > I agree. > > >> the balance() method necessarily has to have a different signature > than group() > Sure. The workaround is to use a factory overload as well, though it is > an overkill. I am only suggesting that you can instantiate the > TaskAssignmentManager in the JC and pass it down to the grouper. That way, it > is only a couple of interface changes to the private method and there is no > risk in the container. > > Navina Ramesh wrote: > Messed up the markdown! Reposting: > *though I don't assume the availability of the TaskAssignmentManager in > the containers is "necessarily" risky.* > > It does pose a potential risk to future implementations. For example, > opening up the read & write interface of any CoordinatorStreamManager is > actually a poor design choice (historically). This needs to be changed. > > *Regardless, it would be even cleaner if the coordinator is ONLY written > centrally by the AM/leader. That's why I'll be filing a JIRA to move the > LocalityManager writes to the AM/leader.* > > We can discuss this offline. Short explanation: Write access was > intentionally provided to Samza containers in LocalityManager because we want > to write the locality of the running container to the coordinator stream ONLY > after it successfully gets into the RunLoop. Since the AM/JC/leader does not > have any feedback indicating it is in the run loop, it should remain in the > container. > > *A different implementation of TaskNameGrouper/BalancingTaskNameGrouper > would employ a different strategy for both* > > I agree. > > *the balance() method necessarily has to have a different signature than > group()* > > Sure. The workaround is to use a factory overload as well, though it is > an overkill. I am only suggesting that you can instantiate the > TaskAssignmentManager in the JC and pass it down to the grouper. That way, it > is only a couple of interface changes to the private method and there is no > risk in the container. > > Jake Maes wrote: > Can you provide examples of the historical problems caused by opening > read & write to the coordinator stream. Without them, I disagree that it's > poor design choice, especially if one agrees that the coordinator stream > should only be written centrally. > > The original intent for writing the locality from the container was not > overlooked. That's why the proposal to move it to the AM/leader will depend > on SAMZA-871 > > Jake Maes wrote: > > Sure. The workaround is to use a factory overload as well, though it is > an overkill. I am only suggesting > > that you can instantiate the TaskAssignmentManager in the JC and pass > it down to the grouper. That way, > > it is only a couple of interface changes to the private method and > there is no risk in the container. > > I could do that, but it would muddle up the signatures of a bunch of JC > methods with unnecessary args, especially since balance() would need to take > both the LocalityManager AND the TaskAssignmentManager as arguments (see > explanation in my original reply). Both pieces of locality info could be > needed for some implementations of balance. Since they're both locality, I > think it's reasonable to have both in the class called LocalityManager and > simplify a bunch of method/interface signatures. If there's significant > enough risk of the TaskAssignmentManager being used in the containers, I > think the compromise is passing it into the LocalityManager constructor from > the JC, but not in the container. > > In any case, we are always one code change away from someone using > TaskAssignmentManager in the containers, no matter how easy or hard it is. > That's why we have code reviews! :-) > > Navina Ramesh wrote: > *Can you provide examples of the historical problems caused by opening > read & write to the coordinator stream. Without them, I disagree that it's > poor design choice, especially if one agrees that the coordinator stream > should only be written centrally. * > *My reference to a poor design choice was with regard putting a "manager" > for accessing a stream, without being able to scope the level of access to > read and write. > > If features in SAMZA-871 are available, it makes sense to move the > "write" of locality to the JC/AM. > > *That's why we have code reviews!* > It gets profoundly hard to understand design choices when the developer > who made the choice is no longer active in the code reviews. :) It's better > to keep the changes simple and intuitive.
Seems like there's a lot of misconceptions and stylistic disagreements that are inhibiting this review. Lets discuss in person. - Jake ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/45144/#review125481 ----------------------------------------------------------- On March 25, 2016, 5:30 p.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/45144/ > ----------------------------------------------------------- > > (Updated March 25, 2016, 5:30 p.m.) > > > Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan > (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Persist the task-to-container mapping in the coordinator stream and use it to > minimize the reassignment when the container count changes. > > A new BalancingTaskNameGrouper interface exposes the balance() method, which > can be implemented pluggably. > GroupByContainerCount has been rewritten in java and the balance() > functionality added. This is because the balance logic is specific to a > grouper. > > Detailed changes: > import-control.xml - Update imports that weren't needed in scala and add some > for tests. > LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the > JobCoordinator code cleaner, but also associates the 2 managers for Host > Affinity info. > GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements > BalancingTaskNameGrouper > GroupByContainerCountFactory - Rewritten in Java > TestStorageRecovery - Old test depended on the order of the partitions in a > container. Now it doesn't. > TestGroupByContainerCount - Rewritten in Java and lots of tests added for > balance() > BalancingTaskNameGrouper - New interface for the balance method. Exposes the > new functionality without breaking backward compatibility > TaskAssignmentManager - Coordinator stream manager for task-to-container > mapping > TaskNameGroupBalancer - Bridges the task mapping (balance) capability with > taskname groupers, old and new > SetTaskContainerMapping - Coordinator strem message for the task-to-container > mapping > > > Diffs > ----- > > checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java > acf93525ea5c97df187bbe7977e2ae9fea65b32b > > samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java > 211b64215f26db49cd4411ff3fb41231145307d7 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java > 4d093b500b7f3b582446634ced3e9d0b28371616 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala > cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala > 8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 06a96ad6ed786c22924017f894413bfa1ea34c06 > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > 429573b480112c7491303dc410d78f37a308c4a7 > samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java > 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 6e9c6fa579a5901000bea0601c771783d8334f0e > > Diff: https://reviews.apache.org/r/45144/diff/ > > > Testing > ------- > > A bunch of new unit tests have been added. > > Also tested with a test job. The task mapping (initially missing) is added > the first time the job is run. It is then used as expected to reduce task > reassignment as the container count was adjusted from 4->3->5 on subsequent > runs. > > > Thanks, > > Jake Maes > >