[
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925393#comment-15925393
]
Syinchwun Leo edited comment on FLINK-5756 at 3/15/17 1:43 AM:
---------------------------------------------------------------
Is it possible that avoiding using merge() operation. I notice that the result
of RocksDB's get() is a byte array. My point is that when calling add() method
of RocksDBListState, call get() first and get byte array, then append new
value's serialized byte[] to byte array, then set back to Rocks. The method
make it is possible there is only one byte[] under the key. I haven't
test the idea, maybe the performance is not perfect and awkward.
was (Author: syinchwunleo):
Is it possible that avoiding using merge() operation. I notice that the result
of RocksDB's get() is a byte array. My point is that when calling add() method
of RocksDBListState, call get() first and get byte array, then append new
value's serialized byte[] to byte array, then set to Rocks. I haven't
test the idea, maybe the performance is not perfect and awkward.
> When there are many values under the same key in ListState,
> RocksDBStateBackend performances poor
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Environment: CentOS 7.2
> Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the
> same key in ListState, the windowState.get() operator performances very poor.
> I also the the RocksDB using version 4.11.2, the performance is also very
> poor. The problem is likely to related to RocksDB itself's get() operator
> after using merge(). The problem may influences the window operation's
> performance when the size is very large using ListState. I try to merge 50000
> values under the same key in RocksDB, It costs 120 seconds to execute get()
> operation.
> ///////////////////////////////////////////////////////////////////////////////
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
> private var count = 0L
> private val alphabet =
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
> override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
> for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
> }
> Thread.sleep(1000)
> }
> }
> }
> env.addSource(new SEventSource)
> .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
> new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
> System.currentTimeMillis()
> }
> })
> .keyBy(0)
> .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
> .apply(new WindowStatistic)
> .map(x => (System.currentTimeMillis(), x))
> .print()
> {code}
> ////////////////////////////////////
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
> .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
> .setLevelCompactionDynamicLevelBytes(true)
> .setIncreaseParallelism(4)
> .setUseFsync(true)
> .setMaxOpenFiles(-1)
> .setCreateIfMissing(true)
> .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/******/Data/")
> val key = "key"
> val value =
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 50000) {
> rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
> //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)