Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r139723011
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
    @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
         // Initialize the data caches.
         val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
         val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    -      new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
    -        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
    +      new MapStateDescriptor[Long, JList[Row]](
    +        timeIndicator + "InnerJoinLeftCache",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        leftListTypeInfo)
         leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
     
         val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
         val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    -      new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
    -        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
    +      new MapStateDescriptor[Long, JList[Row]](
    +        timeIndicator + "InnerJoinRightCache",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        rightListTypeInfo)
         rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
     
         // Initialize the timer states.
         val leftTimerStateDesc: ValueStateDescriptor[Long] =
    -      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
    -        classOf[Long])
    +      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
         leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
     
         val rightTimerStateDesc: ValueStateDescriptor[Long] =
    -      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
    -        classOf[Long])
    +      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
         rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
       }
     
       /**
    -    * Process records from the left stream.
    -    *
    -    * @param cRowValue the input record
    -    * @param ctx       the context to register timer or get current time
    -    * @param out       the collector for outputting results
    -    *
    +    * Process rows from the left stream.
         */
       override def processElement1(
    -    cRowValue: CRow,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow]): Unit = {
    -    val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
    -    getCurrentOperatorTime(ctx)
    +      cRowValue: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +    updateOperatorTime(ctx)
    +    val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
    +    val oppositeLowerBound: Long = rowTime - rightRelativeSize
    +    val oppositeUpperBound: Long = rowTime + leftRelativeSize
         processElement(
           cRowValue,
    -      timeForRecord,
    +      rowTime,
           ctx,
           out,
           leftOperatorTime,
    +      oppositeLowerBound,
    +      oppositeUpperBound,
           rightOperatorTime,
           rightTimerState,
           leftCache,
           rightCache,
    -      true
    +      leftRow = true
         )
       }
     
       /**
    -    * Process records from the right stream.
    -    *
    -    * @param cRowValue the input record
    -    * @param ctx       the context to get current time
    -    * @param out       the collector for outputting results
    -    *
    +    * Process rows from the right stream.
         */
       override def processElement2(
    -    cRowValue: CRow,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow]): Unit = {
    -    val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
    -    getCurrentOperatorTime(ctx)
    +      cRowValue: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +    updateOperatorTime(ctx)
    +    val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
    +    val oppositeLowerBound: Long = rowTime - leftRelativeSize
    +    val oppositeUpperBound: Long =  rowTime + rightRelativeSize
         processElement(
           cRowValue,
    -      timeForRecord,
    +      rowTime,
           ctx,
           out,
           rightOperatorTime,
    +      oppositeLowerBound,
    +      oppositeUpperBound,
           leftOperatorTime,
           leftTimerState,
           rightCache,
           leftCache,
    -      false
    +      leftRow = false
         )
       }
     
       /**
    -    * Put a record from the input stream into the cache and iterate the 
opposite cache to
    -    * output records meeting the join conditions. If there is no timer set 
for the OPPOSITE
    +    * Put a row from the input stream into the cache and iterate the 
opposite cache to
    +    * output join results meeting the conditions. If there is no timer set 
for the OPPOSITE
         * STREAM, register one.
         */
       private def processElement(
    -    cRowValue: CRow,
    -    timeForRecord: Long,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow],
    -    myWatermark: Long,
    -    oppositeWatermark: Long,
    -    oppositeTimeState: ValueState[Long],
    -    recordListCache: MapState[Long, JList[Row]],
    -    oppositeCache: MapState[Long, JList[Row]],
    -    leftRecord: Boolean): Unit = {
    -    if (relativeWindowSize > 0) {
    -      //TODO Shall we consider adding a method for initialization with the 
context and collector?
    -      cRowWrapper.out = out
    -
    -      val record = cRowValue.row
    -
    -      //TODO Only if the time of the record is greater than the watermark, 
can we continue.
    -      if (timeForRecord >= myWatermark - allowedLateness) {
    -        val oppositeLowerBound: Long =
    -          if (leftRecord) timeForRecord - rightRelativeSize else 
timeForRecord - leftRelativeSize
    -
    -        val oppositeUpperBound: Long =
    -          if (leftRecord) timeForRecord + leftRelativeSize else 
timeForRecord + rightRelativeSize
    -
    -        // Put the record into the cache for later use.
    -        val recordList = if (recordListCache.contains(timeForRecord)) {
    -          recordListCache.get(timeForRecord)
    -        } else {
    -          new util.ArrayList[Row]()
    -        }
    -        recordList.add(record)
    -        recordListCache.put(timeForRecord, recordList)
    -
    -        // Register a timer on THE OTHER STREAM to remove records from the 
cache once they are
    -        // expired.
    -        if (oppositeTimeState.value == 0) {
    -          registerCleanUpTimer(
    -            ctx, timeForRecord, oppositeWatermark, oppositeTimeState, 
leftRecord, true)
    -        }
    +      cRowValue: CRow,
    +      timeForRow: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      myWatermark: Long,
    +      oppositeLowerBound: Long,
    +      oppositeUpperBound: Long,
    +      oppositeWatermark: Long,
    +      oppositeTimeState: ValueState[Long],
    +      rowListCache: MapState[Long, JList[Row]],
    +      oppositeCache: MapState[Long, JList[Row]],
    +      leftRow: Boolean): Unit = {
    +    cRowWrapper.out = out
    +    val row = cRowValue.row
    +    if (!checkRowOutOfDate(timeForRow, myWatermark)) {
    +      // Put the row into the cache for later use.
    +      var rowList = rowListCache.get(timeForRow)
    +      if (null == rowList) {
    +        rowList = new ArrayList[Row](1)
    +      }
    +      rowList.add(row)
    +      rowListCache.put(timeForRow, rowList)
    +      // Register a timer on THE OPPOSITE STREAM to remove rows from the 
cache once they are
    +      // expired.
    +      if (oppositeTimeState.value == 0) {
    +        registerCleanUpTimer(
    +          ctx, timeForRow, oppositeWatermark, oppositeTimeState, leftRow, 
firstTimer = true)
    +      }
     
    -        // Join the record with records from the opposite stream.
    -        val oppositeIterator = oppositeCache.iterator()
    -        var oppositeEntry: Entry[Long, util.List[Row]] = null
    -        var oppositeTime: Long = 0L;
    -        while (oppositeIterator.hasNext) {
    -          oppositeEntry = oppositeIterator.next
    -          oppositeTime = oppositeEntry.getKey
    -          if (oppositeTime < oppositeLowerBound - allowedLateness) {
    -            //TODO Considering the data out-of-order, we should not remove 
records here.
    -          } else if (oppositeTime >= oppositeLowerBound && oppositeTime <= 
oppositeUpperBound) {
    -            val oppositeRows = oppositeEntry.getValue
    -            var i = 0
    -            if (leftRecord) {
    -              while (i < oppositeRows.size) {
    -                joinFunction.join(record, oppositeRows.get(i), cRowWrapper)
    -                i += 1
    -              }
    -            } else {
    -              while (i < oppositeRows.size) {
    -                joinFunction.join(oppositeRows.get(i), record, cRowWrapper)
    -                i += 1
    -              }
    +      // Join the row with rows from the opposite stream.
    +      val oppositeIterator = oppositeCache.iterator()
    +      while (oppositeIterator.hasNext) {
    +        val oppositeEntry = oppositeIterator.next
    +        val oppositeTime = oppositeEntry.getKey
    +        if (oppositeTime >= oppositeLowerBound && oppositeTime <= 
oppositeUpperBound) {
    +          val oppositeRows = oppositeEntry.getValue
    +          var i = 0
    +          if (leftRow) {
    +            while (i < oppositeRows.size) {
    +              joinFunction.join(row, oppositeRows.get(i), cRowWrapper)
    +              i += 1
    +            }
    +          } else {
    +            while (i < oppositeRows.size) {
    +              joinFunction.join(oppositeRows.get(i), row, cRowWrapper)
    +              i += 1
                 }
    -          } else if (oppositeTime > oppositeUpperBound) {
    -            //TODO If the keys are ordered, can we break here?
               }
             }
    -      } else {
    -        //TODO Need some extra logic here?
    -        LOG.warn(s"$record is out-of-date.")
    +        // We could do the short-cutting optimization here once we get a 
state with ordered keys.
           }
         }
    +    // We need to deal with the late data in the future.
    --- End diff --
    
    Yes, I agree. That would be a bigger effort though, because it should 
involve the other operators as well. So far we do not provide any metrics for 
the relational operators but it would make sense to do so. 


---

Reply via email to