[
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311792#comment-16311792
]
ASF GitHub Bot commented on FLINK-7797:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5140#discussion_r159700432
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
---
@@ -344,23 +434,42 @@ abstract class TimeBoundedStreamInnerJoin(
* @param removeLeft whether to remove the left rows
*/
private def removeExpiredRows(
+ collector: EmitAwareCollector,
expirationTime: Long,
- rowCache: MapState[Long, JList[Row]],
+ rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]],
timerState: ValueState[Long],
ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
removeLeft: Boolean): Unit = {
- val keysIterator = rowCache.keys().iterator()
+ val iterator = rowCache.iterator()
var earliestTimestamp: Long = -1L
- var rowTime: Long = 0L
// We remove all expired keys and do not leave the loop early.
// Hence, we do a full pass over the state.
- while (keysIterator.hasNext) {
- rowTime = keysIterator.next
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val rowTime = entry.getKey
if (rowTime <= expirationTime) {
- keysIterator.remove()
+ if ((joinType == JoinType.RIGHT_OUTER && !removeLeft) ||
--- End diff --
Refactor to
```
if (removeLeft &&
(joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) {
val rows = entry.getValue
var i = 0
while (i < rows.size) {
val tuple = rows.get(i)
if (!tuple.f1) {
// Emit a null padding result if the row has never been successfully
joined.
collector.collect(paddingUtil.padLeft(tuple.f0))
}
i += 1
}
} else if (!removeLeft &&
(joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) {
val rows = entry.getValue
var i = 0
while (i < rows.size) {
val tuple = rows.get(i)
if (!tuple.f1) {
// Emit a null padding result if the row has never been successfully
joined.
collector.collect(paddingUtil.padRight(tuple.f0))
}
i += 1
}
}
iterator.remove()
```
to reduce the number of conditions.
> Add support for windowed outer joins for streaming tables
> ---------------------------------------------------------
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: Fabian Hueske
> Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER
> joins.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)