Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r140262200
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
// Initialize the data caches.
val leftListTypeInfo: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](leftType)
val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator +
"InnerJoinLeftCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
leftListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinLeftCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ leftListTypeInfo)
leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
val rightListTypeInfo: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](rightType)
val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator +
"InnerJoinRightCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rightListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinRightCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ rightListTypeInfo)
rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
// Initialize the timer states.
val leftTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinLeftTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinLeftTimerState", classOf[Long])
leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
val rightTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinRightTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinRightTimerState", classOf[Long])
rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
}
/**
- * Process records from the left stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to register timer or get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the left stream.
*/
override def processElement1(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - rightRelativeSize
+ val oppositeUpperBound: Long = rowTime + leftRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
leftOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
rightOperatorTime,
rightTimerState,
leftCache,
rightCache,
- true
+ leftRow = true
)
}
/**
- * Process records from the right stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the right stream.
*/
override def processElement2(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - leftRelativeSize
+ val oppositeUpperBound: Long = rowTime + rightRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
rightOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
leftOperatorTime,
leftTimerState,
rightCache,
leftCache,
- false
+ leftRow = false
)
}
/**
- * Put a record from the input stream into the cache and iterate the
opposite cache to
- * output records meeting the join conditions. If there is no timer set
for the OPPOSITE
+ * Put a row from the input stream into the cache and iterate the
opposite cache to
+ * output join results meeting the conditions. If there is no timer set
for the OPPOSITE
* STREAM, register one.
*/
private def processElement(
- cRowValue: CRow,
- timeForRecord: Long,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow],
- myWatermark: Long,
- oppositeWatermark: Long,
- oppositeTimeState: ValueState[Long],
- recordListCache: MapState[Long, JList[Row]],
- oppositeCache: MapState[Long, JList[Row]],
- leftRecord: Boolean): Unit = {
- if (relativeWindowSize > 0) {
- //TODO Shall we consider adding a method for initialization with the
context and collector?
- cRowWrapper.out = out
-
- val record = cRowValue.row
-
- //TODO Only if the time of the record is greater than the watermark,
can we continue.
- if (timeForRecord >= myWatermark - allowedLateness) {
- val oppositeLowerBound: Long =
- if (leftRecord) timeForRecord - rightRelativeSize else
timeForRecord - leftRelativeSize
-
- val oppositeUpperBound: Long =
- if (leftRecord) timeForRecord + leftRelativeSize else
timeForRecord + rightRelativeSize
-
- // Put the record into the cache for later use.
- val recordList = if (recordListCache.contains(timeForRecord)) {
- recordListCache.get(timeForRecord)
- } else {
- new util.ArrayList[Row]()
- }
- recordList.add(record)
- recordListCache.put(timeForRecord, recordList)
-
- // Register a timer on THE OTHER STREAM to remove records from the
cache once they are
- // expired.
- if (oppositeTimeState.value == 0) {
- registerCleanUpTimer(
- ctx, timeForRecord, oppositeWatermark, oppositeTimeState,
leftRecord, true)
- }
+ cRowValue: CRow,
+ timeForRow: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow],
+ myWatermark: Long,
+ oppositeLowerBound: Long,
+ oppositeUpperBound: Long,
+ oppositeWatermark: Long,
+ oppositeTimeState: ValueState[Long],
+ rowListCache: MapState[Long, JList[Row]],
+ oppositeCache: MapState[Long, JList[Row]],
+ leftRow: Boolean): Unit = {
+ cRowWrapper.out = out
+ val row = cRowValue.row
+ if (!checkRowOutOfDate(timeForRow, myWatermark)) {
--- End diff --
Thanks for your thoughts. I understand your motivation to either emit
complete results for a row or none but I think it is fine to emit partial
results. My reasoning is the following:
1. we should try to produce results which are as close to the original
semantics as possible
2. being late (later than the allowedLateness) does not necessarily mean
that we missed matching results. Maybe there were no matching records in the
matched time frame, so we could still produce exact results.
3. By not storing a row in the cache, the join result becomes more
incomplete. Rows of the other stream that would have produced a complete result
will only produce partial results (which we wanted to avoid by not adding it to
state).
So, I'd rather unnest the conditions.
---