Jake Maes created SAMZA-947:
-------------------------------

             Summary: TaskAssignmentManager registration exception when 
partition count changes.
                 Key: SAMZA-947
                 URL: https://issues.apache.org/jira/browse/SAMZA-947
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.10.1
            Reporter: Jake Maes
            Assignee: Jake Maes
            Priority: Minor


The GroupByPartitionCount grouper deletes the persisted task mapping if the 
partition count has changed because there may be fewer tasks and that would 
cause the old mapping to be invalid. 

To delete the mapping, the TaskAssignmentManager registers itself and writes 
null for all the keys. Later when the recalculated mapping is saved, it tries 
to reregister itself, which causes this exception:
Exception in thread "main" org.apache.samza.SamzaException: 
SamzaTaskAssignmentManager is already registered with the queuing system 
producer
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65)
        at 
org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72)
        at 
org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100)
        at 
org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58)
        at 
org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179)
        at 
org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93)
        at 
org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255)
        at 
org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187)
        at 
org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193)
        at 
org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120)
        at 
org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104)
        at 
org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74)
        at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)

In a YARN environment, the AM restarts and since the task mapping has now been 
deleted, this 2nd attempt to save the mapping succeeds. 

Since this issue only occurs when the partition count changes and is 
recoverable, I'm marking it as low priority.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to