Example SQL:

FROM stream1 s1, stream2 s2
WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime

And we have lots of messages in stream1 and stream2 share a same rowtime.

It runs fine when using heap as the state backend,
but requires lots of heap memory sometimes (when upstream out of sync, etc), 
and a risk of full gc exists.

When we use RocksDBStateBackend to lower the heap memory usage, we found our 
program runs unbearably slow.

After examing the code we found
may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same):
    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound 
of the current
      // row. Put it into the left cache, since later coming records from the 
right stream are
      // expected to be joined with it.
      var leftRowList = leftCache.get(timeForLeftRow)
      if (null == leftRowList) {
        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      leftRowList.add(JTuple2.of(leftRow, emitted))
      leftCache.put(timeForLeftRow, leftRowList)

In above code, if there are lots of messages with a same timeForLeftRow,
the serialization and deserialization cost will be very high when using 

A simple fix I came up with:
  // cache to store rows from the left stream
  //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
  private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, 
Boolean]]] = _
  private var leftCacheSize: MapState[Long, Integer] = _
    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound 
of the current
      // row. Put it into the left cache, since later coming records from the 
right stream are
      // expected to be joined with it.
      //var leftRowList = leftCache.get(timeForLeftRow)
      //if (null == leftRowList) {
      //  leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      //leftRowList.add(JTuple2.of(leftRow, emitted))
      //leftCache.put(timeForLeftRow, leftRowList)
      var leftRowListSize = leftCacheSize.get(timeForLeftRow)
      if (null == leftRowListSize) {
        leftRowListSize = new Integer(0)
      leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), 
JTuple2.of(leftRow, emitted))
      leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)

LIU Xiao <xiao.liu...@qq.com>

Reply via email to