Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r139434334
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
// Initialize the data caches.
val leftListTypeInfo: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](leftType)
val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator +
"InnerJoinLeftCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
leftListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinLeftCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ leftListTypeInfo)
leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
val rightListTypeInfo: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](rightType)
val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
- new MapStateDescriptor[Long, JList[Row]](timeIndicator +
"InnerJoinRightCache",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
rightListTypeInfo)
+ new MapStateDescriptor[Long, JList[Row]](
+ timeIndicator + "InnerJoinRightCache",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ rightListTypeInfo)
rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
// Initialize the timer states.
val leftTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinLeftTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinLeftTimerState", classOf[Long])
leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
val rightTimerStateDesc: ValueStateDescriptor[Long] =
- new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinRightTimerState",
- classOf[Long])
+ new ValueStateDescriptor[Long](timeIndicator +
"InnerJoinRightTimerState", classOf[Long])
rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
}
/**
- * Process records from the left stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to register timer or get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the left stream.
*/
override def processElement1(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - rightRelativeSize
+ val oppositeUpperBound: Long = rowTime + leftRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
leftOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
rightOperatorTime,
rightTimerState,
leftCache,
rightCache,
- true
+ leftRow = true
)
}
/**
- * Process records from the right stream.
- *
- * @param cRowValue the input record
- * @param ctx the context to get current time
- * @param out the collector for outputting results
- *
+ * Process rows from the right stream.
*/
override def processElement2(
- cRowValue: CRow,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
- getCurrentOperatorTime(ctx)
+ cRowValue: CRow,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+ val oppositeLowerBound: Long = rowTime - leftRelativeSize
+ val oppositeUpperBound: Long = rowTime + rightRelativeSize
processElement(
cRowValue,
- timeForRecord,
+ rowTime,
ctx,
out,
rightOperatorTime,
+ oppositeLowerBound,
+ oppositeUpperBound,
leftOperatorTime,
leftTimerState,
rightCache,
leftCache,
- false
+ leftRow = false
)
}
/**
- * Put a record from the input stream into the cache and iterate the
opposite cache to
- * output records meeting the join conditions. If there is no timer set
for the OPPOSITE
+ * Put a row from the input stream into the cache and iterate the
opposite cache to
+ * output join results meeting the conditions. If there is no timer set
for the OPPOSITE
* STREAM, register one.
*/
private def processElement(
- cRowValue: CRow,
- timeForRecord: Long,
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- out: Collector[CRow],
- myWatermark: Long,
- oppositeWatermark: Long,
- oppositeTimeState: ValueState[Long],
- recordListCache: MapState[Long, JList[Row]],
- oppositeCache: MapState[Long, JList[Row]],
- leftRecord: Boolean): Unit = {
- if (relativeWindowSize > 0) {
- //TODO Shall we consider adding a method for initialization with the
context and collector?
- cRowWrapper.out = out
-
- val record = cRowValue.row
-
- //TODO Only if the time of the record is greater than the watermark,
can we continue.
- if (timeForRecord >= myWatermark - allowedLateness) {
- val oppositeLowerBound: Long =
- if (leftRecord) timeForRecord - rightRelativeSize else
timeForRecord - leftRelativeSize
-
- val oppositeUpperBound: Long =
- if (leftRecord) timeForRecord + leftRelativeSize else
timeForRecord + rightRelativeSize
-
- // Put the record into the cache for later use.
- val recordList = if (recordListCache.contains(timeForRecord)) {
- recordListCache.get(timeForRecord)
- } else {
- new util.ArrayList[Row]()
- }
- recordList.add(record)
- recordListCache.put(timeForRecord, recordList)
-
- // Register a timer on THE OTHER STREAM to remove records from the
cache once they are
- // expired.
- if (oppositeTimeState.value == 0) {
- registerCleanUpTimer(
- ctx, timeForRecord, oppositeWatermark, oppositeTimeState,
leftRecord, true)
- }
+ cRowValue: CRow,
+ timeForRow: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ out: Collector[CRow],
+ myWatermark: Long,
+ oppositeLowerBound: Long,
+ oppositeUpperBound: Long,
+ oppositeWatermark: Long,
+ oppositeTimeState: ValueState[Long],
+ rowListCache: MapState[Long, JList[Row]],
+ oppositeCache: MapState[Long, JList[Row]],
+ leftRow: Boolean): Unit = {
+ cRowWrapper.out = out
+ val row = cRowValue.row
+ if (!checkRowOutOfDate(timeForRow, myWatermark)) {
+ // Put the row into the cache for later use.
+ var rowList = rowListCache.get(timeForRow)
+ if (null == rowList) {
+ rowList = new ArrayList[Row](1)
+ }
+ rowList.add(row)
+ rowListCache.put(timeForRow, rowList)
+ // Register a timer on THE OPPOSITE STREAM to remove rows from the
cache once they are
+ // expired.
+ if (oppositeTimeState.value == 0) {
+ registerCleanUpTimer(
+ ctx, timeForRow, oppositeWatermark, oppositeTimeState, leftRow,
firstTimer = true)
+ }
- // Join the record with records from the opposite stream.
- val oppositeIterator = oppositeCache.iterator()
- var oppositeEntry: Entry[Long, util.List[Row]] = null
- var oppositeTime: Long = 0L;
- while (oppositeIterator.hasNext) {
- oppositeEntry = oppositeIterator.next
- oppositeTime = oppositeEntry.getKey
- if (oppositeTime < oppositeLowerBound - allowedLateness) {
- //TODO Considering the data out-of-order, we should not remove
records here.
- } else if (oppositeTime >= oppositeLowerBound && oppositeTime <=
oppositeUpperBound) {
- val oppositeRows = oppositeEntry.getValue
- var i = 0
- if (leftRecord) {
- while (i < oppositeRows.size) {
- joinFunction.join(record, oppositeRows.get(i), cRowWrapper)
- i += 1
- }
- } else {
- while (i < oppositeRows.size) {
- joinFunction.join(oppositeRows.get(i), record, cRowWrapper)
- i += 1
- }
+ // Join the row with rows from the opposite stream.
+ val oppositeIterator = oppositeCache.iterator()
+ while (oppositeIterator.hasNext) {
+ val oppositeEntry = oppositeIterator.next
+ val oppositeTime = oppositeEntry.getKey
+ if (oppositeTime >= oppositeLowerBound && oppositeTime <=
oppositeUpperBound) {
+ val oppositeRows = oppositeEntry.getValue
+ var i = 0
+ if (leftRow) {
+ while (i < oppositeRows.size) {
+ joinFunction.join(row, oppositeRows.get(i), cRowWrapper)
+ i += 1
+ }
+ } else {
+ while (i < oppositeRows.size) {
+ joinFunction.join(oppositeRows.get(i), row, cRowWrapper)
+ i += 1
}
- } else if (oppositeTime > oppositeUpperBound) {
- //TODO If the keys are ordered, can we break here?
}
}
- } else {
- //TODO Need some extra logic here?
- LOG.warn(s"$record is out-of-date.")
+ // We could do the short-cutting optimization here once we get a
state with ordered keys.
}
}
+ // We need to deal with the late data in the future.
}
/**
- * Register a timer for cleaning up records in a specified time.
+ * Register a timer for cleaning up rows in a specified time.
*
* @param ctx the context to register timer
- * @param timeForRecord time for the input record
+ * @param rowTime time for the input row
* @param oppositeWatermark watermark of the opposite stream
* @param timerState stores the timestamp for the next timer
- * @param leftRecord record from the left or the right stream
+ * @param leftRow whether this row comes from the left stream
* @param firstTimer whether this is the first timer
*/
private def registerCleanUpTimer(
- ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
- timeForRecord: Long,
- oppositeWatermark: Long,
- timerState: ValueState[Long],
- leftRecord: Boolean,
- firstTimer: Boolean): Unit = {
- val cleanUpTime = timeForRecord + (if (leftRecord) leftRelativeSize
else rightRelativeSize) +
- allowedLateness + 1
- registerTimer(ctx, !leftRecord, cleanUpTime)
- LOG.debug(s"Register a clean up timer on the ${if (leftRecord) "RIGHT"
else "LEFT"} state:"
- + s" timeForRecord = ${timeForRecord}, cleanUpTime = ${cleanUpTime},
oppositeWatermark = " +
- s"${oppositeWatermark}")
- timerState.update(cleanUpTime)
- if (cleanUpTime <= oppositeWatermark + allowedLateness && firstTimer) {
- backPressureSuggestion =
- if (leftRecord) (oppositeWatermark + allowedLateness - cleanUpTime)
- else -(oppositeWatermark + allowedLateness - cleanUpTime)
- LOG.warn("The clean timer for the " +
- s"${if (leftRecord) "left" else "right"}" +
- s" stream is lower than ${if (leftRecord) "right" else "left"}
watermark." +
- s" requiredTime = ${formatTime(cleanUpTime)}, watermark =
${formatTime(oppositeWatermark)},"
- + s"backPressureSuggestion = " + s"${backPressureSuggestion}.")
+ ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+ rowTime: Long,
+ oppositeWatermark: Long,
+ timerState: ValueState[Long],
+ leftRow: Boolean,
+ firstTimer: Boolean): Unit = {
+ val cleanupTime = if (leftRow) {
+ rowTime + leftRelativeSize + cleanupDelay + allowedLateness + 1
+ } else {
+ rowTime + rightRelativeSize + cleanupDelay + allowedLateness + 1
}
+ registerTimer(ctx, !leftRow, cleanupTime)
+ LOG.debug(s"Register a clean up timer on the ${if (leftRow) "RIGHT"
else "LEFT"} state:"
+ + s" timeForRow = ${rowTime}, cleanupTime should be ${cleanupTime -
cleanupDelay}," +
+ s" but delayed to ${cleanupTime}," +
+ s" oppositeWatermark = ${oppositeWatermark}")
+ timerState.update(cleanupTime)
+ //if cleanupTime <= oppositeWatermark + allowedLateness && firstTimer,
we may set the
+ // backPressureSuggestion =
+ // if (leftRow) (oppositeWatermark + allowedLateness - cleanupTime)
+ // else -(oppositeWatermark + allowedLateness - cleanupTime)
}
-
/**
* Called when a registered timer is fired.
- * Remove records which are earlier than the expiration time,
- * and register a new timer for the earliest remaining records.
+ * Remove rows whose timestamps are earlier than the expiration time,
+ * and register a new timer for the remaining rows.
*
* @param timestamp the timestamp of the timer
* @param ctx the context to register timer or get current time
* @param out the collector for returning result values
*/
override def onTimer(
- timestamp: Long,
- ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
- out: Collector[CRow]): Unit = {
- getCurrentOperatorTime(ctx)
- //TODO In the future, we should separate the left and right
watermarks. Otherwise, the
- //TODO registered timer of the faster stream will be delayed, even if
the watermarks have
- //TODO already been emitted by the source.
+ timestamp: Long,
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+ updateOperatorTime(ctx)
+ // In the future, we should separate the left and right watermarks.
Otherwise, the
+ // registered timer of the faster stream will be delayed, even if the
watermarks have
+ // already been emitted by the source.
if (leftTimerState.value == timestamp) {
val rightExpirationTime = leftOperatorTime - rightRelativeSize -
allowedLateness - 1
- removeExpiredRecords(
- timestamp,
+ removeExpiredRows(
rightExpirationTime,
leftOperatorTime,
rightCache,
leftTimerState,
ctx,
- false
+ removeLeft = false
)
}
if (rightTimerState.value == timestamp) {
val leftExpirationTime = rightOperatorTime - leftRelativeSize -
allowedLateness - 1
- removeExpiredRecords(
- timestamp,
+ removeExpiredRows(
leftExpirationTime,
rightOperatorTime,
leftCache,
rightTimerState,
ctx,
- true
+ removeLeft = true
)
}
}
/**
- * Remove the expired records. Register a new timer if the cache still
holds records
+ * Remove the expired rows. Register a new timer if the cache still
holds valid rows
* after the cleaning up.
+ *
+ * @param expirationTime the expiration time for this cache
+ * @param oppositeWatermark the watermark of the opposite stream
+ * @param rowCache the row cache
+ * @param timerState timer state for the opposite stream
+ * @param ctx the context to register the cleanup timer
+ * @param removeLeft whether to remove the left rows
*/
- private def removeExpiredRecords(
- timerFiringTime: Long,
- expirationTime: Long,
- oppositeWatermark: Long,
- recordCache: MapState[Long, JList[Row]],
- timerState: ValueState[Long],
- ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
- removeLeft: Boolean): Unit = {
+ private def removeExpiredRows(
+ expirationTime: Long,
+ oppositeWatermark: Long,
+ rowCache: MapState[Long, JList[Row]],
+ timerState: ValueState[Long],
+ ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
+ removeLeft: Boolean): Unit = {
- val keysIterator = recordCache.keys().iterator()
+ val keysIterator = rowCache.keys().iterator()
// Search for expired timestamps.
// If we find a non-expired timestamp, remember the timestamp and
leave the loop.
// This way we find all expired timestamps if they are sorted without
doing a full pass.
var earliestTimestamp: Long = -1L
- var recordTime: Long = 0L
+ var rowTime: Long = 0L
while (keysIterator.hasNext) {
- //TODO The "short-circuit" code was commented, because when using a
StateMap with
- //TODO unordered keys, the cache will grow indefinitely!
- // && earliestTimestamp < 0) {
- recordTime = keysIterator.next
- if (recordTime <= expirationTime) {
- // TODO Not sure if we can remove records directly.
+ rowTime = keysIterator.next
+ if (rowTime <= expirationTime) {
keysIterator.remove()
} else {
// We find the earliest timestamp that is still valid.
- if (recordTime < earliestTimestamp || earliestTimestamp < 0) {
- earliestTimestamp = recordTime
+ if (rowTime < earliestTimestamp || earliestTimestamp < 0) {
+ earliestTimestamp = rowTime
}
}
}
// If the cache contains non-expired timestamps, register a new timer.
// Otherwise clear the states.
if (earliestTimestamp > 0) {
- registerCleanUpTimer(ctx, earliestTimestamp, oppositeWatermark,
timerState, removeLeft, false)
+ registerCleanUpTimer(
+ ctx,
+ earliestTimestamp,
--- End diff --
I'd apply the `cleanupDelay` here. The first timer does not need it.
---