Jake Maes created SAMZA-947:
-------------------------------
Summary: TaskAssignmentManager registration exception when
partition count changes.
Key: SAMZA-947
URL: https://issues.apache.org/jira/browse/SAMZA-947
Project: Samza
Issue Type: Bug
Affects Versions: 0.10.1
Reporter: Jake Maes
Assignee: Jake Maes
Priority: Minor
The GroupByPartitionCount grouper deletes the persisted task mapping if the
partition count has changed because there may be fewer tasks and that would
cause the old mapping to be invalid.
To delete the mapping, the TaskAssignmentManager registers itself and writes
null for all the keys. Later when the recalculated mapping is saved, it tries
to reregister itself, which causes this exception:
Exception in thread "main" org.apache.samza.SamzaException:
SamzaTaskAssignmentManager is already registered with the queuing system
producer
at
org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65)
at
org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72)
at
org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100)
at
org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58)
at
org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179)
at
org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93)
at
org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255)
at
org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187)
at
org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193)
at
org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120)
at
org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104)
at
org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74)
at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
In a YARN environment, the AM restarts and since the task mapping has now been
deleted, this 2nd attempt to save the mapping succeeds.
Since this issue only occurs when the partition count changes and is
recoverable, I'm marking it as low priority.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)