[ https://issues.apache.org/jira/browse/FLINK-22075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318989#comment-17318989 ]
lincoln lee commented on FLINK-22075: ------------------------------------- [~jamii] thanks for reporting this issue! I spent some time to investigate it, then let me try to explain how the 'wrong' results produced: Firstly, the query contains an equal join condition on the rowtime column(which defines a watermark), it matches the interval join syntax and execution differs from the regular join(the main difference is interval join relies on watermark and does not produce retraction) [datastream interval join|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#interval-joins] [a sql way of interval join|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join] Secondly, your test cases defines a 5 seconds delayed watermark strategy and test data exists out-of-order sequence more than 5s,it will hit the late event rule in TimeIntervalJoin operator and produce an 'unexpected' null join result. You can either remove the watermark definition or enlarge the delay of watermark(make it larger than max delay of out-of-order event) Lastly, the batch mode testing does not work properly, passin the `-Dexecution.runtime-mode=BATCH` cannot overwrite the EnvironmentSettings which hardcoded in DEMO.java {code:java} EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();{code} so the testing is still in streaming mode. For testing this, I added a similar case into `IntervalJoinITCase` as below: {code:java} @Test def testRowTimeSelfLeftJoin(): Unit = { val sqlQuery = """ |SELECT t1.key, t2.key |FROM T1 AS t1 LEFT OUTER JOIN T1 AS t2 ON | t1.key = t2.key AND | t1.rowtime = t2.rowtime """.stripMargin val data = new mutable.MutableList[(Int, Timestamp)] val random = new Random(42) val max = 20 for (i <- 0 until max) { val sec = ((50 * i) / max) + random.nextInt(9) data.+=((i, Timestamp.valueOf(s"2021-01-01 00:00:${sec}.000"))) } val t1 = env.fromCollection(data) .assignTimestampsAndWatermarks(new Row2WatermarkExtractor) .toTable(tEnv, 'key, 'rowtime.rowtime) tEnv.registerTable("T1", t1) val sink = new TestingAppendSink val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] result.addSink(sink) env.execute() val expected = (0 until max).map(i => s"$i,$i").toList assertEquals(expected.sorted, sink.getAppendResults.sorted) } private class Row2WatermarkExtractor extends AssignerWithPunctuatedWatermarks[(Int, Timestamp)] { override def checkAndGetNextWatermark( lastElement: (Int, Timestamp), extractedTimestamp: Long): Watermark = { // eq to watermark's offset in sql ddl, reproduces the 'null' result if set to `- 5000`(5 seconds delayed watermark strategy) new Watermark(extractedTimestamp - 10000) } override def extractTimestamp( element: (Int, Timestamp), previousElementTimestamp: Long): Long = { element._2.getTime } } {code} > Incorrect null outputs in left join > ----------------------------------- > > Key: FLINK-22075 > URL: https://issues.apache.org/jira/browse/FLINK-22075 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.12.2 > Environment: > https://github.com/jamii/streaming-consistency/blob/4e5d144dacf85e512bdc7afd77d031b5974d733e/pkgs.nix#L25-L46 > ``` > [nix-shell:~/streaming-consistency/flink]$ java -version > openjdk version "1.8.0_265" > OpenJDK Runtime Environment (build 1.8.0_265-ga) > OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode) > [nix-shell:~/streaming-consistency/flink]$ flink --version > Version: 1.12.2, Commit ID: 4dedee0 > [nix-shell:~/streaming-consistency/flink]$ nix-info > system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, > channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: > /nix/var/nix/profiles/per-user/root/channels/nixos > ``` > Reporter: Jamie Brandon > Assignee: lincoln lee > Priority: Critical > Fix For: 1.13.0 > > > I'm left joining a table with itself > [here](https://github.com/jamii/streaming-consistency/blob/4e5d144dacf85e512bdc7afd77d031b5974d733e/flink/src/main/java/Demo.java#L55-L66). > The output should have no nulls, or at least emit nulls and then retract > them. Instead I see: > ``` > jamie@machine:~/streaming-consistency/flink$ wc -l tmp/outer_join_with_time > 100000 tmp/outer_join_with_time > jamie@machine:~/streaming-consistency/flink$ grep -c insert > tmp/outer_join_with_time > 100000 > jamie@machine:~/streaming-consistency/flink$ grep -c 'null' > tmp/outer_join_with_time > 16943 > ``` > ~17% of the outputs are incorrect and never retracted. > [Full > output](https://gist.githubusercontent.com/jamii/983fee41609b1425fe7fa59d3249b249/raw/069b9dcd4faf9f6113114381bc7028c6642ca787/gistfile1.txt) -- This message was sent by Atlassian Jira (v8.3.4#803005)