Example SQL: SELECT * FROM stream1 s1, stream2 s2 WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime
And we have lots of messages in stream1 and stream2 share a same rowtime. It runs fine when using heap as the state backend, but requires lots of heap memory sometimes (when upstream out of sync, etc), and a risk of full gc exists. When we use RocksDBStateBackend to lower the heap memory usage, we found our program runs unbearably slow. After examing the code we found org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1() may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same): ... // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current // row. Put it into the left cache, since later coming records from the right stream are // expected to be joined with it. var leftRowList = leftCache.get(timeForLeftRow) if (null == leftRowList) { leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) } leftRowList.add(JTuple2.of(leftRow, emitted)) leftCache.put(timeForLeftRow, leftRowList) ... In above code, if there are lots of messages with a same timeForLeftRow, the serialization and deserialization cost will be very high when using RocksDBStateBackend. A simple fix I came up with: ... // cache to store rows from the left stream //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _ private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, Boolean]]] = _ private var leftCacheSize: MapState[Long, Integer] = _ ... // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current // row. Put it into the left cache, since later coming records from the right stream are // expected to be joined with it. //var leftRowList = leftCache.get(timeForLeftRow) //if (null == leftRowList) { // leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) //} //leftRowList.add(JTuple2.of(leftRow, emitted)) //leftCache.put(timeForLeftRow, leftRowList) var leftRowListSize = leftCacheSize.get(timeForLeftRow) if (null == leftRowListSize) { leftRowListSize = new Integer(0) } leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), JTuple2.of(leftRow, emitted)) leftCacheSize.put(timeForLeftRow, leftRowListSize + 1) ... -- LIU Xiao <xiao.liu...@qq.com>