[ 
https://issues.apache.org/jira/browse/FLINK-17800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17122333#comment-17122333
 ] 

Yun Tang commented on FLINK-17800:
----------------------------------

[~liyu] Thanks for your suggestion.

[~YordanPavlov], from my local experiments, If I revert the implementation of 
[OptimizeForPointLookup|https://github.com/facebook/rocksdb/blob/6d113fc066c5816887eb19c84d12c0677a68af2b/options/options.cc#L514-L526],
 this bug still existed in newer RocksDB. That's to say using {{NoopTransform}} 
(which is not obvious in RocksDB-java) and {{kHashSearch}} indexType could lead 
to this bug. And I have also created [related 
issue|https://github.com/facebook/rocksdb/issues/6893] in RocksDB community.



> RocksDB optimizeForPointLookup results in missing time windows
> --------------------------------------------------------------
>
>                 Key: FLINK-17800
>                 URL: https://issues.apache.org/jira/browse/FLINK-17800
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.10.0, 1.10.1
>            Reporter: Yordan Pavlov
>            Assignee: Yun Tang
>            Priority: Blocker
>             Fix For: 1.11.0, 1.10.2
>
>         Attachments: MissingWindows.scala, MyMissingWindows.scala, 
> MyMissingWindows.scala
>
>
> +My Setup:+
> We have been using the _RocksDb_ option of _optimizeForPointLookup_ and 
> running version 1.7 for years. Upon upgrading to Flink 1.10 we started 
> receiving a strange behavior of missing time windows on a streaming Flink 
> job. For the purpose of testing I experimented with previous Flink version 
> and (1.8, 1.9, 1.9.3) and non of them showed the problem
>  
> A sample of the code demonstrating the problem is here:
> {code:java}
>  val datastream = env
>  .addSource(KafkaSource.keyedElements(config.kafkaElements, 
> List(config.kafkaBootstrapServer)))
>  val result = datastream
>  .keyBy( _ => 1)
>  .timeWindow(Time.milliseconds(1))
>  .print()
> {code}
>  
>  
> The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka 
> topics), the elements in each of the streams are separately increasing. The 
> elements generate increasing timestamps using an event time and start from 1, 
> increasing by 1. The first partitions would consist of timestamps 1, 2, 10, 
> 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9...
>  
> +What I observe:+
> The time windows would open as I expect for the first 127 timestamps. Then 
> there would be a huge gap with no opened windows, if the source has many 
> elements, then next open window would be having a timestamp in the thousands. 
> A gap of hundred of elements would be created with what appear to be 'lost' 
> elements. Those elements are not reported as late (if tested with the 
> ._sideOutputLateData_ operator). The way we have been using the option is by 
> setting in inside the config like so:
> ??etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456??
> We have been using it for performance reasons as we have huge RocksDB state 
> backend.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to