lakshmi-manasa-g commented on code in PR #1599:
URL: https://github.com/apache/samza/pull/1599#discussion_r865101103
##########
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala:
##########
@@ -216,9 +216,19 @@ class OffsetManager(
* Set the last processed offset for a given SystemStreamPartition.
*/
def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition,
offset: String) {
+ // without elasticity enabled, there is exactly one entry of an ssp in the
systemStreamPartitions map for a taskName
+ // with elasticity enabled, there is exactly one of the keyBuckets of an
ssp that a task consumes
+ // and hence exactly one entry of an ssp with keyBucket in in the
systemStreamPartitions map for a taskName
+ // hence from the given ssp, find its sspWithKeybucket for the task and
use that for updating lastProcessedOffsets
+ val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,
Review Comment:
Is the question:
(a) without elasticity, does this PR work with broadcast streams OR
(b) WITH elasticity, PR works for broadcast?
for (b)
broadcast stream expected behavior may not be valid with elasticity enabled.
meaning, if broadcast streams was used to ensure that all messages in a
broadcast ssp is consumed by all tasks with some expectation that some
processing logic in a task relies on all messages from broadcast ssp arriving
at it then that could potentially be incorrect. For this reason, let me update
the pr desc (and elasticity jira desc) that elasticity does not support
broadcast streams.
for (a) without elasticity - yes it works.
this change is based on the fact that an ssp is present in a task’s model
(aka list of ssp consumed) exactly once and that OffsetManager.update is called
with taskName and ssp info.
even with broadcast streams, if two partitions of a broadcast stream are
consumed by a job then a task will consume both partitions.
but each partition of the broadcast stream is a separate ssp in the task’s
model (same system stream but different partition).
Let me elaborate with an example to make this clearer.
suppose job has NO elasticity enabled. Say job has one input stream with two
partitions i0, i1 and two broadcast partitions b0, b1. Note that i0, i1, b0, b1
are all SSP - have info about system, stream and partition.
Lets say using GroupByPartition SSP grouper. Job model will look like Task0
consumes i0, b0 and b1. Task1 consumes i1, b0, b1.
Prior to this change:
update(task, ssp, offset) did
lastProcessedOffsets.get(taskName).put(ssp, offset)
Now:
update(task, ssp, offset) does
fetched-ssp = ssp-consumed-by-taskName.get(input-ssp such that
input-ssp.system, stream, partition match the given ssp)
lastProcessedOffsets.get(taskName).put(fetched-ssp, offset)
so if update(Task0, i0, 1) comes in, new code will find in the list <i0, b0,
b1> the ssp that has matching system, stream and partition and finds i0.
similarly for update(Task0, b0, 2) for broadcast input and so on.
Now, suppose this job enabled elasticity with elasticity.factor = 2 and
with the same grouper
Jobmodel will look like Task0_0_2 consumes <(i0,0), (b0,0), (b1,0)> // where
(i0,0) means keyBucket 0 of i0 ssp.
Task0_1_2 consumes <(i0,1), (b0,1), (b1,1)> // where (i0,1) means keyBucket
1 of i0 ssp.
Now, when Task0_0_2 finishes consuming say offset 4 of i0 (broadcast input)
then update(Task0_0_2, i0, 4) is called.
when looking for i0 in Task0_0_2 list of ssp, (i0,0) is found and added to
last proc offsets.
This would be similar for the other grouper - GroupBySystemStreamPartition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]