[
https://issues.apache.org/jira/browse/BEAM-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697536#comment-16697536
]
Jozef Vilcek commented on BEAM-6077:
------------------------------------
OperatorStateStore is already used by the wrapper. Seems like it is good enough
to set initial splits granular enough and source wrapper works fine. Only
drawback is, that in case of e.g. kafka, the running structure was
UnboundedSourceWrapper -> LocalSplitReader / KafkaUnboundedReader -> N
partitions to drain
now it will be
UnboundedSourceWrapper -> N LocalSplitReader / KafkaUnboundedReader -> 1
partition to drain
I do not see a problem right away but it is not easy to reason about it as code
is quite complex. It will spin some more resources by having more beam kafka
readers.
I have briefly looked at Flink's kafka consumer. That seems to operate a bit
different in general. I could not find any command to do initial splitting
decision as Beam have. It looks like all partitions are visible to each source
by partition discovery and the via assign fetcher will pick only ones relevant
for current indexOfThisSubtask(). Fetcher seems to act similar as
KafkaUnboundedReader (capable of reading N partitions) but at the end it
produces fine grained state to checkpoint Map[Partition, Offset] but beam does
not. Without change of contract between
UnboundedSource<->Reader<->CheckpintMark I do not see a way to optimize it.
> Make UnboundedSource state rescale friendly
> -------------------------------------------
>
> Key: BEAM-6077
> URL: https://issues.apache.org/jira/browse/BEAM-6077
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Affects Versions: 2.8.0
> Reporter: Jozef Vilcek
> Assignee: Jozef Vilcek
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> FlinkRunner's UnboundedSourceWrapper currently does not rescale well it's
> state when job changes parallelism.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)