----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51250/#review146427 -----------------------------------------------------------
@Xiang, thanks for put up this long-wanted improvement! I mainly have one high-level comment: when updating the job models, we need to make sure that containers w/ old job model does not double-processing the same partition w/ containers w/ new job model. Thanks! samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java (line 21) <https://reviews.apache.org/r/51250/#comment212856> nit: generally, we would try to avoid import w/ "*". Any reason that you use a "*" here? samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 125) <https://reviews.apache.org/r/51250/#comment212865> This section of code is exactly the same copy from 82-93. Would be better to make it a common function instead of copying the code. samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 148) <https://reviews.apache.org/r/51250/#comment213051> Why not passing in the oldJobModel's changelog partition mapping? Ideally, from line 139, the code should be almost the same as in initializeJobModel(). It would be better to use a common function createJobModel(config: Config, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache, previousChangelogPartition: util.HashMap[TaskName, Integer]) to avoid copying the code. samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 165) <https://reviews.apache.org/r/51250/#comment212863> What's the reason to *always* set to oldest here? This would change the default behavior the user may config in the old config. I am curious what's the rationale behind it? samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 284) <https://reviews.apache.org/r/51250/#comment213053> When the input partition changes, which may cause the number of tasks change, the changelog partition map may change as well. I think that we should still update the changelog partition mapping. samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java (line 488) <https://reviews.apache.org/r/51250/#comment213077> The concern here is: 1. when stopContainer() fails after all 3 retries, the old container may still be running. Hence, it will leave the job running in a mixed mode: some containers running w/ old job model and some containers running w/ new job model, which is bad and could lead to double processing or missing a partition. The safer solution seems to be: 1. throw exception if stopContainer() fails after all re-tries. 2. do not start new containers until all old containers are killed There are multiple ways to implement 2: a) lock the container allocator in JobCoordinator until all old containers are killed. It is easier, but downside is that the job will have longer pause time. b) allow the containers to start in a "paused" mode on JobModel change, until all old containers are killed by the JobCoordinator. We have to implement a push mechanism from the JobCoordinator to notify all containers to "resume operation". It is more complex, but enables finer granular control s.t. some new containers can start earlier when a subset of old containers are killed (i.e. all new containers processing partitions that are not processed by old containers can start immediately). I would recommend to go w/ option a) first, and gradually moving toward b) - Yi Pan (Data Infrastructure) On Aug. 19, 2016, 7:20 p.m., Xiang Fu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51250/ > ----------------------------------------------------------- > > (Updated Aug. 19, 2016, 7:20 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Support container restart when partition count diff happens. > > > Diffs > ----- > > > samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java > 8652465ca6764bc69a2767bc3a4f436ca9f8a401 > samza-core/src/main/java/org/apache/samza/job/model/JobModel.java > dbd6dcc41644e9ea1e6a12dcd18f44ef2e63bc72 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce > > samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java > 96d3d7cc2853356a338dc25067f01440c938e216 > samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java > dacc52de0a34498a715a299bc69c95aebd1b08ba > > Diff: https://reviews.apache.org/r/51250/diff/ > > > Testing > ------- > > > Thanks, > > Xiang Fu > >