[ 
https://issues.apache.org/jira/browse/FLINK-11050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liu updated FLINK-11050:
------------------------
    Description: 
    When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.

    Our usage is like below:
{code:java}
labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
           .between(Time.milliseconds(0), Time.milliseconds(600000))
           .process(new processFunction())
           .sink(kafkaProducer)
{code}
    Our data is huge. The job always runs for an hour and is stuck by RocksDB's 
seek when get buffer's entries. We use rocksDB's data to simulate the problem 
RocksDB and find that it takes too much time in deleted values. So we decide to 
optimize it by assigning the lowerBound instead of global search.

  was:
    When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.

    Our usage is like below:
{code:java}
labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
           .between(Time.milliseconds(0), Time.milliseconds(600000))
           .process(new processFunction())
           .sink(kafkaProducer)
{code}
    Our data is huge. The job always runs for an hour and is stuck by RocksDB's 
seek when get buffer's entries. We use rocksDB's data to simulate the problem 
RocksDB. 


> When IntervalJoin, get left or right buffer's entries more quickly by 
> assigning lowerBound
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11050
>                 URL: https://issues.apache.org/jira/browse/FLINK-11050
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.2, 1.7.0
>            Reporter: Liu
>            Priority: Major
>              Labels: performance
>             Fix For: 1.7.1
>
>
>     When IntervalJoin, it is very slow to get left or right buffer's entries. 
> Because we have to scan all buffer's values, including the deleted values 
> which are out of time range. These deleted values's processing consumes too 
> much time in RocksDB's level 0. Since lowerBound is known, it can be 
> optimized by seek from the timestamp of lowerBound.
>     Our usage is like below:
> {code:java}
> labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
>            .between(Time.milliseconds(0), Time.milliseconds(600000))
>            .process(new processFunction())
>            .sink(kafkaProducer)
> {code}
>     Our data is huge. The job always runs for an hour and is stuck by 
> RocksDB's seek when get buffer's entries. We use rocksDB's data to simulate 
> the problem RocksDB and find that it takes too much time in deleted values. 
> So we decide to optimize it by assigning the lowerBound instead of global 
> search.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to