[
https://issues.apache.org/jira/browse/FLINK-30770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683396#comment-17683396
]
lincoln lee edited comment on FLINK-30770 at 2/2/23 12:49 PM:
--------------------------------------------------------------
[~terlenbach]
> but aren't -U records always followed by either a +U record or a +I and a -D
> record?
not exactly, take the following changelog as example input
{code}
id status
+I 100 PREMATCH
-U 100 PREMATCH
+U 100 CLOSE
{code}
if exists a filter condition "status = 'PREMATCH", then in this case(not the
only case) the '-U 100 PREMATCH' will be the last message sending to the
downstream operator.
was (Author: lincoln.86xy):
[~terlenbach]
> but aren't -U records always followed by either a +U record or a +I and a -D
> record?
not exactly, take the following changelog as example input
id status
+I 100 PREMATCH
-U 100 PREMATCH
+U 100 CLOSE
if exists a filter condition "status = 'PREMATCH", then in this case(not the
only case) the '-U 100 PREMATCH' will be the last message sending to the
downstream operator.
> OUTER join generates partial results
> ------------------------------------
>
> Key: FLINK-30770
> URL: https://issues.apache.org/jira/browse/FLINK-30770
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.5, 1.15.2
> Reporter: Thomas Erlenbach
> Priority: Major
>
> Hi,
>
> I would like to share a scenario with you that seems to reveal a bug to me:
> Given the data,
> {code:java}
> // file /tmp/input_table.jsonl
> {"event_time_utc": "2023-01-23T10:02:41.677798", "match_id": "100"}
> {code}
> {code:java}
> // file /tmp/matches_state.jsonl
>
> {"match_id": "100", "status": "PREMATCH", "event_time_utc":
> "2023-01-23T10:02:40.000000"}
> {"match_id": "100", "status": "CLOSE", "event_time_utc":
> "2023-01-23T10:02:50.000000"}
> {"match_id": "100", "status": "ENDED", "event_time_utc":
> "2023-01-23T10:02:55.000000"}
> {"match_id": "100", "status": "ENDED-1", "event_time_utc":
> "2023-01-23T10:03:55.000000"}
> {"match_id": "100", "status": "ENDED-2", "event_time_utc":
> "2023-01-23T10:04:55.000000"}
> {"match_id": "100", "status": "ENDED-3", "event_time_utc":
> "2023-01-23T10:05:55.000000"}
>
> {code}
>
> and the scenario,
> {code:java}
> // scenario.sql
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'tableau';
> CREATE TABLE input_table (
> match_id STRING NOT NULL,
> event_time_utc TIMESTAMP(3) NOT NULL
> )
> WITH (
> 'format' = 'json',
> 'json.timestamp-format.standard' = 'ISO-8601',
> 'path' = '/tmp/input_table.jsonl',
> 'connector' = 'filesystem'
> );
> CREATE TABLE matches_state (
> match_id STRING NOT NULL,
> status STRING NOT NULL,
> event_time_utc TIMESTAMP(3) NOT NULL
> )
> WITH (
> 'format' = 'json',
> 'json.timestamp-format.standard' = 'ISO-8601',
> 'path' = '/tmp/matches_state.jsonl',
> 'connector' = 'filesystem'
> );
> WITH last_matches_state AS (
> SELECT match_id, status
> FROM (
> SELECT
> match_id,
> status,
> ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
> DESC) AS rownum
> FROM matches_state
> )
> WHERE rownum = 1
> )
> , agg_match AS (
> SELECT
> match_id,
> COUNT(*) AS bet_count
> FROM input_table
> GROUP BY match_id
> )
> SELECT
> agg_match.match_id,
> agg_match.bet_count,
> last_matches_state.status
> FROM
> agg_match
> LEFT JOIN last_matches_state
> ON last_matches_state.match_id = agg_match.match_id;
> {code}
>
> Here is what I get from the query:
> {code:java}
> // Result
> +----+--------------------------------+----------------------+--------------------------------+
> | op | match_id | bet_count |
> status |
> +----+--------------------------------+----------------------+--------------------------------+
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> PREMATCH |
> | -U | 100 | 1 |
> PREMATCH |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> CLOSE |
> | -U | 100 | 1 |
> CLOSE |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED |
> | -U | 100 | 1 |
> ENDED |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED-1 |
> | -U | 100 | 1 |
> ENDED-1 |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED-2 |
> | -U | 100 | 1 |
> ENDED-2 |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED-3 |
> +----+--------------------------------+----------------------+--------------------------------+
> Received a total of 23 rows {code}
>
> The first part of the CTE query computes the last status of a match, the
> second part computes the number of rows by match_id, and then there is a left
> outer join query to link that count with the match status.
> However, on match updates, I see that *some rows are generated with a status
> set to NULL* whereas there was already a status set for the corresponding
> match.
>
> Interestingly, replacing the left outer join by an inner join, generates a
> different output:
>
> {code:java}
> WITH last_matches_state AS (
> SELECT match_id, status
> FROM (
> SELECT
> match_id,
> status,
> ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
> DESC) AS rownum
> FROM matches_state
> )
> WHERE rownum = 1
> )
> , agg_match AS (
> SELECT
> match_id,
> COUNT(*) AS bet_count
> FROM input_table
> GROUP BY match_id
> )
> SELECT
> agg_match.match_id,
> agg_match.bet_count,
> last_matches_state.status
> FROM
> agg_match
> INNER JOIN last_matches_state
> ON last_matches_state.match_id = agg_match.match_id;
> +----+--------------------------------+----------------------+--------------------------------+
> | op | match_id | bet_count |
> status |
> +----+--------------------------------+----------------------+--------------------------------+
> | +I | 100 | 1 |
> PREMATCH |
> | -U | 100 | 1 |
> PREMATCH |
> | +U | 100 | 1 |
> CLOSE |
> | -U | 100 | 1 |
> CLOSE |
> | +U | 100 | 1 |
> ENDED |
> | -U | 100 | 1 |
> ENDED |
> | +U | 100 | 1 |
> ENDED-1 |
> | -U | 100 | 1 |
> ENDED-1 |
> | +U | 100 | 1 |
> ENDED-2 |
> | -U | 100 | 1 |
> ENDED-2 |
> | +U | 100 | 1 |
> ENDED-3 |
> +----+--------------------------------+----------------------+--------------------------------+
> {code}
> I would have expected to get the same results from the query with the left
> outer join (except the first entry when there is no status yet).
>
> Is there any reason for receiving these partial results with the left outer
> join query?
>
> I'd be happy to help should you need more details!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)