[ 
https://issues.apache.org/jira/browse/FLINK-22075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318989#comment-17318989
 ] 

lincoln lee commented on FLINK-22075:
-------------------------------------

[~jamii] thanks for reporting this issue!


 I spent some time to investigate it, then let me try to explain how the 
'wrong' results produced:
 Firstly, the query contains an equal join condition on the rowtime 
column(which defines a watermark),
 it matches the interval join syntax and execution differs from the regular 
join(the main difference is interval join relies on watermark and does not 
produce retraction) 
 [datastream interval 
join|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#interval-joins]
 [a sql way of interval 
join|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html#interval-join]

Secondly, your test cases defines a 5 seconds delayed watermark strategy and 
test data exists out-of-order sequence more than 5s,it will hit the late event 
rule in TimeIntervalJoin operator and produce an 'unexpected' null join result. 
You can either remove the watermark definition or enlarge the delay of 
watermark(make it larger than max delay of out-of-order event)

Lastly, the batch mode testing does not work properly, passin the 
`-Dexecution.runtime-mode=BATCH` cannot overwrite the EnvironmentSettings which 
hardcoded in DEMO.java
{code:java}
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();{code}
so the testing is still in streaming mode.

For testing this, I added a similar case into `IntervalJoinITCase` as below:
{code:java}
  @Test
  def testRowTimeSelfLeftJoin(): Unit = {
    val sqlQuery =
      """
        |SELECT t1.key, t2.key
        |FROM T1 AS t1 LEFT OUTER JOIN T1 AS t2 ON
        | t1.key = t2.key AND
        | t1.rowtime = t2.rowtime
      """.stripMargin

    val data = new mutable.MutableList[(Int, Timestamp)]
    val random = new Random(42)
    val max = 20
    for (i <- 0 until max) {
      val sec = ((50 * i) / max) + random.nextInt(9)
      data.+=((i, Timestamp.valueOf(s"2021-01-01 00:00:${sec}.000")))
    }
    val t1 = env.fromCollection(data)
      .assignTimestampsAndWatermarks(new Row2WatermarkExtractor)
      .toTable(tEnv, 'key, 'rowtime.rowtime)
    tEnv.registerTable("T1", t1)

    val sink = new TestingAppendSink
    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    result.addSink(sink)
    env.execute()
    val expected = (0 until max).map(i => s"$i,$i").toList
    assertEquals(expected.sorted, sink.getAppendResults.sorted)
  }

private class Row2WatermarkExtractor
  extends AssignerWithPunctuatedWatermarks[(Int, Timestamp)] {

  override def checkAndGetNextWatermark(
      lastElement: (Int, Timestamp),
      extractedTimestamp: Long): Watermark = {
    // eq to watermark's offset in sql ddl, reproduces the 'null' result if set 
to `- 5000`(5 seconds delayed watermark strategy)
    new Watermark(extractedTimestamp - 10000)
  }

  override def extractTimestamp(
      element: (Int, Timestamp),
      previousElementTimestamp: Long): Long = {
    element._2.getTime
  }
}
{code}

> Incorrect null outputs in left join
> -----------------------------------
>
>                 Key: FLINK-22075
>                 URL: https://issues.apache.org/jira/browse/FLINK-22075
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.2
>         Environment: 
> https://github.com/jamii/streaming-consistency/blob/4e5d144dacf85e512bdc7afd77d031b5974d733e/pkgs.nix#L25-L46
> ```
> [nix-shell:~/streaming-consistency/flink]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/flink]$ flink --version
> Version: 1.12.2, Commit ID: 4dedee0
> [nix-shell:~/streaming-consistency/flink]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos
> ```
>            Reporter: Jamie Brandon
>            Assignee: lincoln lee
>            Priority: Critical
>             Fix For: 1.13.0
>
>
> I'm left joining a table with itself 
> [here](https://github.com/jamii/streaming-consistency/blob/4e5d144dacf85e512bdc7afd77d031b5974d733e/flink/src/main/java/Demo.java#L55-L66).
>  The output should have no nulls, or at least emit nulls and then retract 
> them. Instead I see:
> ```
> jamie@machine:~/streaming-consistency/flink$ wc -l tmp/outer_join_with_time
> 100000 tmp/outer_join_with_time
> jamie@machine:~/streaming-consistency/flink$ grep -c insert 
> tmp/outer_join_with_time
> 100000
> jamie@machine:~/streaming-consistency/flink$ grep -c 'null' 
> tmp/outer_join_with_time
> 16943
> ```
> ~17% of the outputs are incorrect and never retracted.
> [Full 
> output](https://gist.githubusercontent.com/jamii/983fee41609b1425fe7fa59d3249b249/raw/069b9dcd4faf9f6113114381bc7028c6642ca787/gistfile1.txt)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to