Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r139446305
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
// Initialize the data caches.
val leftListTypeInfo: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](leftType)
val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator +
"InnerJoinLeftCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
leftListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinLeftCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ leftListTypeInfo)
leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
val rightListTypeInfo: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](rightType)
val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator +
"InnerJoinRightCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rightListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinRightCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ rightListTypeInfo)
rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
// Initialize the timer states.
val leftTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinLeftTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinLeftTimerState", classOf[Long])
leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
val rightTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinRightTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinRightTimerState", classOf[Long])
rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
}
/**
- * Process records from the left stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to register timer or get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the left stream.
*/
override def processElement1(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - rightRelativeSize
+ val oppositeUpperBound: Long = rowTime + leftRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
leftOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
rightOperatorTime,
rightTimerState,
leftCache,
rightCache,
- true
+ leftRow = true
)
}
/**
- * Process records from the right stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the right stream.
*/
override def processElement2(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - leftRelativeSize
+ val oppositeUpperBound: Long = rowTime + rightRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
rightOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
leftOperatorTime,
leftTimerState,
rightCache,
leftCache,
- false
+ leftRow = false
)
}
/**
- * Put a record from the input stream into the cache and iterate the
opposite cache to
- * output records meeting the join conditions. If there is no timer set
for the OPPOSITE
+ * Put a row from the input stream into the cache and iterate the
opposite cache to
+ * output join results meeting the conditions. If there is no timer set
for the OPPOSITE
* STREAM, register one.
*/
private def processElement(
- cRowValue: CRow,
- timeForRecord: Long,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow],
- myWatermark: Long,
- oppositeWatermark: Long,
- oppositeTimeState: ValueState[Long],
- recordListCache: MapState[Long, JList[Row]],
- oppositeCache: MapState[Long, JList[Row]],
- leftRecord: Boolean): Unit = {
- if (relativeWindowSize > 0) {
- //TODO Shall we consider adding a method for initialization with the
context and collector?
- cRowWrapper.out = out
-
- val record = cRowValue.row
-
- //TODO Only if the time of the record is greater than the watermark,
can we continue.
- if (timeForRecord >= myWatermark - allowedLateness) {
- val oppositeLowerBound: Long =
- if (leftRecord) timeForRecord - rightRelativeSize else
timeForRecord - leftRelativeSize
-
- val oppositeUpperBound: Long =
- if (leftRecord) timeForRecord + leftRelativeSize else
timeForRecord + rightRelativeSize
-
- // Put the record into the cache for later use.
- val recordList = if (recordListCache.contains(timeForRecord)) {
- recordListCache.get(timeForRecord)
- } else {
- new util.ArrayList[Row]()
- }
- recordList.add(record)
- recordListCache.put(timeForRecord, recordList)
-
- // Register a timer on THE OTHER STREAM to remove records from the
cache once they are
- // expired.
- if (oppositeTimeState.value == 0) {
- registerCleanUpTimer(
- ctx, timeForRecord, oppositeWatermark, oppositeTimeState,
leftRecord, true)
- }
+ cRowValue: CRow,
+ timeForRow: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow],
+ myWatermark: Long,
+ oppositeLowerBound: Long,
+ oppositeUpperBound: Long,
+ oppositeWatermark: Long,
+ oppositeTimeState: ValueState[Long],
+ rowListCache: MapState[Long, JList[Row]],
+ oppositeCache: MapState[Long, JList[Row]],
+ leftRow: Boolean): Unit = {
+ cRowWrapper.out = out
+ val row = cRowValue.row
+ if (!checkRowOutOfDate(timeForRow, myWatermark)) {
--- End diff --
I thought about this condition again. I think we need to distinguish two
cases:
1. **Storing a record in state**: We need to put a row into state if we
must expect qualifying rows on the other side to arrive, i.e., the time of
other input is before the upper window boundary of current row (adapted by
allowed lateness). Hence, we also need to check the other input's watermark
when accessing (inserting) the cache of the current input (same pattern as for
cleaning state).
2. **Joining with other side**: We need to iterate over the other input's
cache if there might be qualifying rows in the cache. Since we clean the other
cache based on the watermark of the current input, we need to check the rowtime
against the watermark of the current input (adjusted by allowed lateness). This
is the check that you have right now.
Both might be true, just one, or none depending on the progress of the
watermarks and the window boundaries.
So, I would move the code to insert the row into the cache out of this
condition and wrap it in another condition that checks against the other
input's watermark.
Please check if this make sense. It's quite easy to get confused ;-)
---