[
https://issues.apache.org/jira/browse/SAMZA-2794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hunter Moffitt updated SAMZA-2794:
----------------------------------
Description:
When Standby Partitions are disabled and the Job is redeployed, a new set of
coordinator stream messages are written for the "set-task-partition-assignment"
namespace without the standby partitions listed in them. The messages include
the new addition to the key from Samza elasticity changes \"keyBucket\":-1}".
The original task-partition assignment messages in the coordinator stream do
not contain this value in the key.
Because of the addition to the key, both the prior partition assignments and
the new partition assignments are read as separate entries. Depending on the
order the entries processed, the mapping is overwritten when the second entry
for the same SystemStreamPartition is processed, resulting in inconsistent
mappings with some of the SystemStreamPartitions containing Standby partition
task names and others only containing the active Partitions as task names.
In JobModelHelper, where the the standby partitions would be filtered out by
comparing against the taskMode mapping, the taskMode mapping does not contain
information about any standby partitions, so they fall under the default case
of !taskModes.containsKey(taskName) and move on to the GrouperMetaData, and
then are counted by the SSPGrouperProxy.
Because of the additional partitions being counted, the SSPGroupProxy acts as
if the input topic was repartitioned, and throws an error stating "New
partition count: {} should be a multiple of previous partition count: {}.” Even
though the input topic partitions have not changed and neither have their task
mappings.
was:When Standby Partitions are disabled and the Job is redeployed, a new set
of coordinator stream messages are written for the
"set-task-partition-assignment" namespace without the standby partitions listed
in them. The messages include the new addition to the key from Samza elasticity
changes \"keyBucket\":-1}". Because of the addition to the key, both the prior
partition assignments and the new partition assignments are read as separate
entries. Depending on the order the entries processed, the mapping is
overwritten when the second processed entry for the same SystemStreamPartition
is processed, resulting in inconsistent mappings with some of the
SystemStreamPartitions containing Standby partition tasknames and others only
containing the current Partitions as tasknames. In JobModelHelper, where the
the standby partitions would be filtered out by comparing against the taskMode
mapping, the taskMode mapping does not contain information about any standby
partitions, so they fall under the default case of
!taskModes.containsKey(taskName) and move on to the GrouperMetaData, and then
are counted by the SSPGrouperProxy.
> SSPGrouperProxy Partition Count Change Error
> --------------------------------------------
>
> Key: SAMZA-2794
> URL: https://issues.apache.org/jira/browse/SAMZA-2794
> Project: Samza
> Issue Type: Bug
> Reporter: Hunter Moffitt
> Priority: Major
>
> When Standby Partitions are disabled and the Job is redeployed, a new set of
> coordinator stream messages are written for the
> "set-task-partition-assignment" namespace without the standby partitions
> listed in them. The messages include the new addition to the key from Samza
> elasticity changes \"keyBucket\":-1}". The original task-partition assignment
> messages in the coordinator stream do not contain this value in the key.
> Because of the addition to the key, both the prior partition assignments and
> the new partition assignments are read as separate entries. Depending on the
> order the entries processed, the mapping is overwritten when the second entry
> for the same SystemStreamPartition is processed, resulting in inconsistent
> mappings with some of the SystemStreamPartitions containing Standby partition
> task names and others only containing the active Partitions as task names.
> In JobModelHelper, where the the standby partitions would be filtered out by
> comparing against the taskMode mapping, the taskMode mapping does not contain
> information about any standby partitions, so they fall under the default case
> of !taskModes.containsKey(taskName) and move on to the GrouperMetaData, and
> then are counted by the SSPGrouperProxy.
> Because of the additional partitions being counted, the SSPGroupProxy acts as
> if the input topic was repartitioned, and throws an error stating "New
> partition count: {} should be a multiple of previous partition count: {}.”
> Even though the input topic partitions have not changed and neither have
> their task mappings.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)