[
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962314#comment-15962314
]
ASF GitHub Bot commented on FLINK-5756:
---------------------------------------
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/3704
[FLINK-5756] Replace RocksDB dependency with FRocksDB
@StefanRRichter has created a custom RocksDB release that fixes FLINK-5756.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink flink5756
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3704.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3704
----
commit b05c595a37ea5b3a08ef4a11d9259eb7aabee005
Author: Robert Metzger <[email protected]>
Date: 2017-04-09T20:05:08Z
[FLINK-5756] Replace RocksDB dependency with FRocksDB
----
> When there are many values under the same key in ListState,
> RocksDBStateBackend performances poor
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Environment: CentOS 7.2
> Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the
> same key in ListState, the windowState.get() operator performances very poor.
> I also the the RocksDB using version 4.11.2, the performance is also very
> poor. The problem is likely to related to RocksDB itself's get() operator
> after using merge(). The problem may influences the window operation's
> performance when the size is very large using ListState. I try to merge 50000
> values under the same key in RocksDB, It costs 120 seconds to execute get()
> operation.
> ///////////////////////////////////////////////////////////////////////////////
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
> private var count = 0L
> private val alphabet =
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
> override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
> for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
> }
> Thread.sleep(1000)
> }
> }
> }
> env.addSource(new SEventSource)
> .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
> new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
> System.currentTimeMillis()
> }
> })
> .keyBy(0)
> .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
> .apply(new WindowStatistic)
> .map(x => (System.currentTimeMillis(), x))
> .print()
> {code}
> ////////////////////////////////////
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
> .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
> .setLevelCompactionDynamicLevelBytes(true)
> .setIncreaseParallelism(4)
> .setUseFsync(true)
> .setMaxOpenFiles(-1)
> .setCreateIfMissing(true)
> .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/******/Data/")
> val key = "key"
> val value =
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 50000) {
> rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
> //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)