[ 
https://issues.apache.org/jira/browse/BEAM-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697536#comment-16697536
 ] 

Jozef Vilcek commented on BEAM-6077:
------------------------------------

OperatorStateStore is already used by the wrapper. Seems like it is good enough 
to set initial splits granular enough and source wrapper works fine. Only 
drawback is, that in case of e.g. kafka, the running structure was

    UnboundedSourceWrapper -> LocalSplitReader / KafkaUnboundedReader -> N 
partitions to drain 

now it will be 

   UnboundedSourceWrapper -> N LocalSplitReader / KafkaUnboundedReader -> 1 
partition to drain

I do not see a problem right away but it is not easy to reason about it as code 
is quite complex. It will spin some more resources by having more beam kafka 
readers.

I have briefly looked at Flink's kafka consumer. That seems to operate a bit 
different in general. I could not find any command to do initial splitting 
decision as Beam have. It looks like all partitions are visible to each source 
by partition discovery and the via assign fetcher will pick only ones relevant 
for current indexOfThisSubtask(). Fetcher seems to act similar as 
KafkaUnboundedReader (capable of reading N partitions) but at the end it 
produces fine grained state to checkpoint Map[Partition, Offset] but beam does 
not. Without change of contract between 
UnboundedSource<->Reader<->CheckpintMark I do not see a way to optimize it.

> Make UnboundedSource state rescale friendly
> -------------------------------------------
>
>                 Key: BEAM-6077
>                 URL: https://issues.apache.org/jira/browse/BEAM-6077
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.8.0
>            Reporter: Jozef Vilcek
>            Assignee: Jozef Vilcek
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> FlinkRunner's UnboundedSourceWrapper currently does not rescale well it's 
> state when job changes parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to