Example SQL:

SELECT *
FROM stream1 s1, stream2 s2
WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime

And we have lots of messages in stream1 and stream2 share a same rowtime.

It runs fine when using heap as the state backend,
but requires lots of heap memory sometimes (when upstream out of sync, etc), 
and a risk of full gc exists.

When we use RocksDBStateBackend to lower the heap memory usage, we found our 
program runs unbearably slow.

After examing the code we found
org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1()
may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same):
...
    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound 
of the current
      // row. Put it into the left cache, since later coming records from the 
right stream are
      // expected to be joined with it.
      var leftRowList = leftCache.get(timeForLeftRow)
      if (null == leftRowList) {
        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      }
      leftRowList.add(JTuple2.of(leftRow, emitted))
      leftCache.put(timeForLeftRow, leftRowList)
...

In above code, if there are lots of messages with a same timeForLeftRow,
the serialization and deserialization cost will be very high when using 
RocksDBStateBackend.

A simple fix I came up with:
...
  // cache to store rows from the left stream
  //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
  private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, 
Boolean]]] = _
  private var leftCacheSize: MapState[Long, Integer] = _
...
    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound 
of the current
      // row. Put it into the left cache, since later coming records from the 
right stream are
      // expected to be joined with it.
      //var leftRowList = leftCache.get(timeForLeftRow)
      //if (null == leftRowList) {
      //  leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      //}
      //leftRowList.add(JTuple2.of(leftRow, emitted))
      //leftCache.put(timeForLeftRow, leftRowList)
      var leftRowListSize = leftCacheSize.get(timeForLeftRow)
      if (null == leftRowListSize) {
        leftRowListSize = new Integer(0)
      }
      leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), 
JTuple2.of(leftRow, emitted))
      leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)
...

-- 
LIU Xiao <xiao.liu...@qq.com>

Reply via email to