-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51250/#review146427
-----------------------------------------------------------



@Xiang, thanks for put up this long-wanted improvement! I mainly have one 
high-level comment: when updating the job models, we need to make sure that 
containers w/ old job model does not double-processing the same partition w/ 
containers w/ new job model. Thanks!


samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 (line 21)
<https://reviews.apache.org/r/51250/#comment212856>

    nit: generally, we would try to avoid import w/ "*". Any reason that you 
use a "*" here?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 125)
<https://reviews.apache.org/r/51250/#comment212865>

    This section of code is exactly the same copy from 82-93. Would be better 
to make it a common function instead of copying the code.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 148)
<https://reviews.apache.org/r/51250/#comment213051>

    Why not passing in the oldJobModel's changelog partition mapping? Ideally, 
from line 139, the code should be almost the same as in initializeJobModel(). 
It would be better to use a common function 
    
    createJobModel(config: Config, 
      changelogManager: ChangelogPartitionManager,
      localityManager: LocalityManager,
      streamMetadataCache: StreamMetadataCache,
      previousChangelogPartition: util.HashMap[TaskName, Integer])
      
    to avoid copying the code.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 165)
<https://reviews.apache.org/r/51250/#comment212863>

    What's the reason to *always* set to oldest here? This would change the 
default behavior the user may config in the old config. I am curious what's the 
rationale behind it?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
(line 284)
<https://reviews.apache.org/r/51250/#comment213053>

    When the input partition changes, which may cause the number of tasks 
change, the changelog partition map may change as well. I think that we should 
still update the changelog partition mapping.



samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 (line 488)
<https://reviews.apache.org/r/51250/#comment213077>

    The concern here is:
    
    1. when stopContainer() fails after all 3 retries, the old container may 
still be running. Hence, it will leave the job running in a mixed mode: some 
containers running w/ old job model and some containers running w/ new job 
model, which is bad and could lead to double processing or missing a partition.
    
    The safer solution seems to be:
    1. throw exception if stopContainer() fails after all re-tries.
    2. do not start new containers until all old containers are killed
    
    There are multiple ways to implement 2:
    a) lock the container allocator in JobCoordinator until all old containers 
are killed. It is easier, but downside is that the job will have longer pause 
time.
    b) allow the containers to start in a "paused" mode on JobModel change, 
until all old containers are killed by the JobCoordinator. We have to implement 
a push mechanism from the JobCoordinator to notify all containers to "resume 
operation". It is more complex, but enables finer granular control s.t. some 
new containers can start earlier when a subset of old containers are killed 
(i.e. all new containers processing partitions that are not processed by old 
containers can start immediately).
    
    I would recommend to go w/ option a) first, and gradually moving toward b)


- Yi Pan (Data Infrastructure)


On Aug. 19, 2016, 7:20 p.m., Xiang Fu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51250/
> -----------------------------------------------------------
> 
> (Updated Aug. 19, 2016, 7:20 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Support container restart when partition count diff happens.
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
>  8652465ca6764bc69a2767bc3a4f436ca9f8a401 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
> dbd6dcc41644e9ea1e6a12dcd18f44ef2e63bc72 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
>  96d3d7cc2853356a338dc25067f01440c938e216 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java 
> dacc52de0a34498a715a299bc69c95aebd1b08ba 
> 
> Diff: https://reviews.apache.org/r/51250/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Xiang Fu
> 
>

Reply via email to