[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174868#comment-16174868
 ] 

ASF GitHub Bot commented on FLINK-6233:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r140262200
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
    @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
         // Initialize the data caches.
         val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
         val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    -      new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
    -        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
    +      new MapStateDescriptor[Long, JList[Row]](
    +        timeIndicator + "InnerJoinLeftCache",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        leftListTypeInfo)
         leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
     
         val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
         val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    -      new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
    -        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
    +      new MapStateDescriptor[Long, JList[Row]](
    +        timeIndicator + "InnerJoinRightCache",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        rightListTypeInfo)
         rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
     
         // Initialize the timer states.
         val leftTimerStateDesc: ValueStateDescriptor[Long] =
    -      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
    -        classOf[Long])
    +      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
         leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
     
         val rightTimerStateDesc: ValueStateDescriptor[Long] =
    -      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
    -        classOf[Long])
    +      new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
         rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
       }
     
       /**
    -    * Process records from the left stream.
    -    *
    -    * @param cRowValue the input record
    -    * @param ctx       the context to register timer or get current time
    -    * @param out       the collector for outputting results
    -    *
    +    * Process rows from the left stream.
         */
       override def processElement1(
    -    cRowValue: CRow,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow]): Unit = {
    -    val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
    -    getCurrentOperatorTime(ctx)
    +      cRowValue: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +    updateOperatorTime(ctx)
    +    val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
    +    val oppositeLowerBound: Long = rowTime - rightRelativeSize
    +    val oppositeUpperBound: Long = rowTime + leftRelativeSize
         processElement(
           cRowValue,
    -      timeForRecord,
    +      rowTime,
           ctx,
           out,
           leftOperatorTime,
    +      oppositeLowerBound,
    +      oppositeUpperBound,
           rightOperatorTime,
           rightTimerState,
           leftCache,
           rightCache,
    -      true
    +      leftRow = true
         )
       }
     
       /**
    -    * Process records from the right stream.
    -    *
    -    * @param cRowValue the input record
    -    * @param ctx       the context to get current time
    -    * @param out       the collector for outputting results
    -    *
    +    * Process rows from the right stream.
         */
       override def processElement2(
    -    cRowValue: CRow,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow]): Unit = {
    -    val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
    -    getCurrentOperatorTime(ctx)
    +      cRowValue: CRow,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +    updateOperatorTime(ctx)
    +    val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
    +    val oppositeLowerBound: Long = rowTime - leftRelativeSize
    +    val oppositeUpperBound: Long =  rowTime + rightRelativeSize
         processElement(
           cRowValue,
    -      timeForRecord,
    +      rowTime,
           ctx,
           out,
           rightOperatorTime,
    +      oppositeLowerBound,
    +      oppositeUpperBound,
           leftOperatorTime,
           leftTimerState,
           rightCache,
           leftCache,
    -      false
    +      leftRow = false
         )
       }
     
       /**
    -    * Put a record from the input stream into the cache and iterate the 
opposite cache to
    -    * output records meeting the join conditions. If there is no timer set 
for the OPPOSITE
    +    * Put a row from the input stream into the cache and iterate the 
opposite cache to
    +    * output join results meeting the conditions. If there is no timer set 
for the OPPOSITE
         * STREAM, register one.
         */
       private def processElement(
    -    cRowValue: CRow,
    -    timeForRecord: Long,
    -    ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    -    out: Collector[CRow],
    -    myWatermark: Long,
    -    oppositeWatermark: Long,
    -    oppositeTimeState: ValueState[Long],
    -    recordListCache: MapState[Long, JList[Row]],
    -    oppositeCache: MapState[Long, JList[Row]],
    -    leftRecord: Boolean): Unit = {
    -    if (relativeWindowSize > 0) {
    -      //TODO Shall we consider adding a method for initialization with the 
context and collector?
    -      cRowWrapper.out = out
    -
    -      val record = cRowValue.row
    -
    -      //TODO Only if the time of the record is greater than the watermark, 
can we continue.
    -      if (timeForRecord >= myWatermark - allowedLateness) {
    -        val oppositeLowerBound: Long =
    -          if (leftRecord) timeForRecord - rightRelativeSize else 
timeForRecord - leftRelativeSize
    -
    -        val oppositeUpperBound: Long =
    -          if (leftRecord) timeForRecord + leftRelativeSize else 
timeForRecord + rightRelativeSize
    -
    -        // Put the record into the cache for later use.
    -        val recordList = if (recordListCache.contains(timeForRecord)) {
    -          recordListCache.get(timeForRecord)
    -        } else {
    -          new util.ArrayList[Row]()
    -        }
    -        recordList.add(record)
    -        recordListCache.put(timeForRecord, recordList)
    -
    -        // Register a timer on THE OTHER STREAM to remove records from the 
cache once they are
    -        // expired.
    -        if (oppositeTimeState.value == 0) {
    -          registerCleanUpTimer(
    -            ctx, timeForRecord, oppositeWatermark, oppositeTimeState, 
leftRecord, true)
    -        }
    +      cRowValue: CRow,
    +      timeForRow: Long,
    +      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
    +      out: Collector[CRow],
    +      myWatermark: Long,
    +      oppositeLowerBound: Long,
    +      oppositeUpperBound: Long,
    +      oppositeWatermark: Long,
    +      oppositeTimeState: ValueState[Long],
    +      rowListCache: MapState[Long, JList[Row]],
    +      oppositeCache: MapState[Long, JList[Row]],
    +      leftRow: Boolean): Unit = {
    +    cRowWrapper.out = out
    +    val row = cRowValue.row
    +    if (!checkRowOutOfDate(timeForRow, myWatermark)) {
    --- End diff --
    
    Thanks for your thoughts. I understand your motivation to either emit 
complete results for a row or none but I think it is fine to emit partial 
results. My reasoning is the following:
    
    1. we should try to produce results which are as close to the original 
semantics as possible
    2. being late (later than the allowedLateness) does not necessarily mean 
that we missed matching results. Maybe there were no matching records in the 
matched time frame, so we could still produce exact results.
    3. By not storing a row in the cache, the join result becomes more 
incomplete. Rows of the other stream that would have produced a complete result 
will only produce partial results (which we wanted to avoid by not adding it to 
state).
    
    So, I'd rather unnest the conditions.


> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime < s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to