[
https://issues.apache.org/jira/browse/FLINK-30770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thomas Erlenbach updated FLINK-30770:
-------------------------------------
Description:
Hi,
I would like to share a scenario with you that seems to reveal a bug to me:
Given the data,
{code:java}
// file /tmp/input_table.jsonl
{"event_time_utc": "2023-01-23T10:02:41.677798", "match_id": "100"}
{code}
{code:java}
// file /tmp/matches_state.jsonl
{"match_id": "100", "status": "PREMATCH", "event_time_utc":
"2023-01-23T10:02:40.000000"}
{"match_id": "100", "status": "CLOSE", "event_time_utc":
"2023-01-23T10:02:50.000000"}
{"match_id": "100", "status": "ENDED", "event_time_utc":
"2023-01-23T10:02:55.000000"}
{"match_id": "100", "status": "ENDED-1", "event_time_utc":
"2023-01-23T10:03:55.000000"}
{"match_id": "100", "status": "ENDED-2", "event_time_utc":
"2023-01-23T10:04:55.000000"}
{"match_id": "100", "status": "ENDED-3", "event_time_utc":
"2023-01-23T10:05:55.000000"}
{code}
and the scenario,
{code:java}
// scenario.sql
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE input_table (
match_id STRING NOT NULL,
event_time_utc TIMESTAMP(3) NOT NULL
)
WITH (
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'path' = '/tmp/input_table.jsonl',
'connector' = 'filesystem'
);
CREATE TABLE matches_state (
match_id STRING NOT NULL,
status STRING NOT NULL,
event_time_utc TIMESTAMP(3) NOT NULL
)
WITH (
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'path' = '/tmp/matches_state.jsonl',
'connector' = 'filesystem'
);
WITH last_matches_state AS (
SELECT match_id, status
FROM (
SELECT
match_id,
status,
ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
DESC) AS rownum
FROM matches_state
)
WHERE rownum = 1
)
, agg_match AS (
SELECT
match_id,
COUNT(*) AS bet_count
FROM input_table
GROUP BY match_id
)
SELECT
agg_match.match_id,
agg_match.bet_count,
last_matches_state.status
FROM
agg_match
LEFT JOIN last_matches_state
ON last_matches_state.match_id = agg_match.match_id;
{code}
Here is what I get from the query:
{code:java}
// Result
+----+--------------------------------+----------------------+--------------------------------+
| op | match_id | bet_count |
status |
+----+--------------------------------+----------------------+--------------------------------+
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
PREMATCH |
| -U | 100 | 1 |
PREMATCH |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
CLOSE |
| -U | 100 | 1 |
CLOSE |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
ENDED |
| -U | 100 | 1 |
ENDED |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
ENDED-1 |
| -U | 100 | 1 |
ENDED-1 |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
ENDED-2 |
| -U | 100 | 1 |
ENDED-2 |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
ENDED-3 |
+----+--------------------------------+----------------------+--------------------------------+
Received a total of 23 rows {code}
The first part of the CTE query computes the last status of a match, the second
part computes the number of rows by match_id, and then there is a left outer
join query to link that count with the match status.
However, on match updates, I see that *some rows are generated with a status
set to NULL* whereas there was already a status set for the corresponding match.
Interestingly, replacing the left outer join by an inner join, generates a
different output:
{code:java}
WITH last_matches_state AS (
SELECT match_id, status
FROM (
SELECT
match_id,
status,
ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
DESC) AS rownum
FROM matches_state
)
WHERE rownum = 1
)
, agg_match AS (
SELECT
match_id,
COUNT(*) AS bet_count
FROM input_table
GROUP BY match_id
)
SELECT
agg_match.match_id,
agg_match.bet_count,
last_matches_state.status
FROM
agg_match
INNER JOIN last_matches_state
ON last_matches_state.match_id = agg_match.match_id;
+----+--------------------------------+----------------------+--------------------------------+
| op | match_id | bet_count |
status |
+----+--------------------------------+----------------------+--------------------------------+
| +I | 100 | 1 |
PREMATCH |
| -U | 100 | 1 |
PREMATCH |
| +U | 100 | 1 |
CLOSE |
| -U | 100 | 1 |
CLOSE |
| +U | 100 | 1 |
ENDED |
| -U | 100 | 1 |
ENDED |
| +U | 100 | 1 |
ENDED-1 |
| -U | 100 | 1 |
ENDED-1 |
| +U | 100 | 1 |
ENDED-2 |
| -U | 100 | 1 |
ENDED-2 |
| +U | 100 | 1 |
ENDED-3 |
+----+--------------------------------+----------------------+--------------------------------+
{code}
I would have expected to get the same results from the query with the left
outer join (except the first entry when there is no status yet).
Is there any reason for receiving these partial results with the left outer
join query?
I'd be happy to help should you need more details!
was:
Hi,
I would like to share a scenario with you that seems to reveal a bug to me:
Given the data,
{code:java}
// file /tmp/input_table.jsonl
{"event_time_utc": "2023-01-23T10:02:41.677798", "match_id": "100"}
{code}
{code:java}
// file /tmp/matches_state.jsonl
{"match_id": "100", "status": "PREMATCH", "event_time_utc":
"2023-01-23T10:02:40.000000"}
{"match_id": "100", "status": "CLOSE", "event_time_utc":
"2023-01-23T10:02:50.000000"}
{"match_id": "100", "status": "ENDED", "event_time_utc":
"2023-01-23T10:02:55.000000"}
{"match_id": "100", "status": "ENDED-1", "event_time_utc":
"2023-01-23T10:03:55.000000"}
{"match_id": "100", "status": "ENDED-2", "event_time_utc":
"2023-01-23T10:04:55.000000"}
{"match_id": "100", "status": "ENDED-3", "event_time_utc":
"2023-01-23T10:05:55.000000"}
{code}
and the scenario,
{code:java}
// scenario.sql
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE input_table (
match_id STRING NOT NULL,
event_time_utc TIMESTAMP(3) NOT NULL )
WITH (
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'path' = '/tmp/input_table.jsonl',
'connector' = 'filesystem'
);
CREATE TABLE matches_state (
match_id STRING NOT NULL,
status STRING NOT NULL,
event_time_utc TIMESTAMP(3) NOT NULL
) WITH (
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'path' = '/tmp/matches_state.jsonl',
'connector' = 'filesystem'
);
WITH last_matches_state AS (
SELECT match_id, status
FROM (
SELECT
match_id,
status,
ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
DESC) AS rownum
FROM matches_state
)
WHERE rownum = 1
)
, agg_match AS (
SELECT
match_id,
COUNT(*) AS bet_count
FROM input_table
GROUP BY match_id
)
SELECT
agg_match.match_id,
agg_match.bet_count,
last_matches_state.status
FROM
agg_match
LEFT JOIN last_matches_state
ON last_matches_state.match_id = agg_match.match_id;
{code}
Here is what I get from the query:
{code:java}
// Result
+----+--------------------------------+----------------------+--------------------------------+
| op | match_id | bet_count |
status |
+----+--------------------------------+----------------------+--------------------------------+
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
PREMATCH |
| -U | 100 | 1 |
PREMATCH |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
CLOSE |
| -U | 100 | 1 |
CLOSE |
| +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
ENDED |
| -U | 100 | 1 |
ENDED || +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> || +I | 100 | 1 |
ENDED-1 |
| -U | 100 | 1 |
ENDED-1 || +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> || +I | 100 | 1 |
ENDED-2 |
| -U | 100 | 1 |
ENDED-2 || +I | 100 | 1 |
<NULL> |
| -D | 100 | 1 |
<NULL> |
| +I | 100 | 1 |
ENDED-3 |
+----+--------------------------------+----------------------+--------------------------------+
Received a total of 23 rows {code}
The first part of the CTE query computes the last status of a match, the second
part computes the number of rows by match_id, and then there is a left outer
join query to link that count with the match status.
However, on match updates, I see that *some rows are generated with a status
set to NULL* whereas there was already a status set for the corresponding match.
Interestingly, replacing the left outer join by an inner join, generates a
different output:
{code:java}
WITH last_matches_state AS (
SELECT match_id, status
FROM (
SELECT
match_id,
status,
ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
DESC) AS rownum
FROM matches_state
)
WHERE rownum = 1
)
, agg_match AS (
SELECT
match_id,
COUNT(*) AS bet_count
FROM input_table
GROUP BY match_id
)
SELECT
agg_match.match_id,
agg_match.bet_count,
last_matches_state.status
FROM
agg_match
INNER JOIN last_matches_state
ON last_matches_state.match_id = agg_match.match_id;
+----+--------------------------------+----------------------+--------------------------------+
| op | match_id | bet_count |
status |
+----+--------------------------------+----------------------+--------------------------------+
| +I | 100 | 1 |
PREMATCH |
| -U | 100 | 1 |
PREMATCH |
| +U | 100 | 1 |
CLOSE |
| -U | 100 | 1 |
CLOSE |
| +U | 100 | 1 |
ENDED |
| -U | 100 | 1 |
ENDED |
| +U | 100 | 1 |
ENDED-1 |
| -U | 100 | 1 |
ENDED-1 |
| +U | 100 | 1 |
ENDED-2 |
| -U | 100 | 1 |
ENDED-2 |
| +U | 100 | 1 |
ENDED-3 |
+----+--------------------------------+----------------------+--------------------------------+
{code}
I would have expected to get the same results from the query with the left
outer join (except the first entry when there is no status yet).
Is there any reason for receiving these partial results with the left outer
join query?
I'd be happy to help should you need more details!
> OUTER join generates partial results
> ------------------------------------
>
> Key: FLINK-30770
> URL: https://issues.apache.org/jira/browse/FLINK-30770
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.13.5, 1.15.2
> Reporter: Thomas Erlenbach
> Priority: Major
>
> Hi,
>
> I would like to share a scenario with you that seems to reveal a bug to me:
> Given the data,
> {code:java}
> // file /tmp/input_table.jsonl
> {"event_time_utc": "2023-01-23T10:02:41.677798", "match_id": "100"}
> {code}
> {code:java}
> // file /tmp/matches_state.jsonl
>
> {"match_id": "100", "status": "PREMATCH", "event_time_utc":
> "2023-01-23T10:02:40.000000"}
> {"match_id": "100", "status": "CLOSE", "event_time_utc":
> "2023-01-23T10:02:50.000000"}
> {"match_id": "100", "status": "ENDED", "event_time_utc":
> "2023-01-23T10:02:55.000000"}
> {"match_id": "100", "status": "ENDED-1", "event_time_utc":
> "2023-01-23T10:03:55.000000"}
> {"match_id": "100", "status": "ENDED-2", "event_time_utc":
> "2023-01-23T10:04:55.000000"}
> {"match_id": "100", "status": "ENDED-3", "event_time_utc":
> "2023-01-23T10:05:55.000000"}
>
> {code}
>
> and the scenario,
> {code:java}
> // scenario.sql
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'tableau';
> CREATE TABLE input_table (
> match_id STRING NOT NULL,
> event_time_utc TIMESTAMP(3) NOT NULL
> )
> WITH (
> 'format' = 'json',
> 'json.timestamp-format.standard' = 'ISO-8601',
> 'path' = '/tmp/input_table.jsonl',
> 'connector' = 'filesystem'
> );
> CREATE TABLE matches_state (
> match_id STRING NOT NULL,
> status STRING NOT NULL,
> event_time_utc TIMESTAMP(3) NOT NULL
> )
> WITH (
> 'format' = 'json',
> 'json.timestamp-format.standard' = 'ISO-8601',
> 'path' = '/tmp/matches_state.jsonl',
> 'connector' = 'filesystem'
> );
> WITH last_matches_state AS (
> SELECT match_id, status
> FROM (
> SELECT
> match_id,
> status,
> ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
> DESC) AS rownum
> FROM matches_state
> )
> WHERE rownum = 1
> )
> , agg_match AS (
> SELECT
> match_id,
> COUNT(*) AS bet_count
> FROM input_table
> GROUP BY match_id
> )
> SELECT
> agg_match.match_id,
> agg_match.bet_count,
> last_matches_state.status
> FROM
> agg_match
> LEFT JOIN last_matches_state
> ON last_matches_state.match_id = agg_match.match_id;
> {code}
>
> Here is what I get from the query:
> {code:java}
> // Result
> +----+--------------------------------+----------------------+--------------------------------+
> | op | match_id | bet_count |
> status |
> +----+--------------------------------+----------------------+--------------------------------+
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> PREMATCH |
> | -U | 100 | 1 |
> PREMATCH |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> CLOSE |
> | -U | 100 | 1 |
> CLOSE |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED |
> | -U | 100 | 1 |
> ENDED |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED-1 |
> | -U | 100 | 1 |
> ENDED-1 |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED-2 |
> | -U | 100 | 1 |
> ENDED-2 |
> | +I | 100 | 1 |
> <NULL> |
> | -D | 100 | 1 |
> <NULL> |
> | +I | 100 | 1 |
> ENDED-3 |
> +----+--------------------------------+----------------------+--------------------------------+
> Received a total of 23 rows {code}
>
> The first part of the CTE query computes the last status of a match, the
> second part computes the number of rows by match_id, and then there is a left
> outer join query to link that count with the match status.
> However, on match updates, I see that *some rows are generated with a status
> set to NULL* whereas there was already a status set for the corresponding
> match.
>
> Interestingly, replacing the left outer join by an inner join, generates a
> different output:
>
> {code:java}
> WITH last_matches_state AS (
> SELECT match_id, status
> FROM (
> SELECT
> match_id,
> status,
> ROW_NUMBER() OVER (PARTITION BY match_id ORDER BY event_time_utc
> DESC) AS rownum
> FROM matches_state
> )
> WHERE rownum = 1
> )
> , agg_match AS (
> SELECT
> match_id,
> COUNT(*) AS bet_count
> FROM input_table
> GROUP BY match_id
> )
> SELECT
> agg_match.match_id,
> agg_match.bet_count,
> last_matches_state.status
> FROM
> agg_match
> INNER JOIN last_matches_state
> ON last_matches_state.match_id = agg_match.match_id;
> +----+--------------------------------+----------------------+--------------------------------+
> | op | match_id | bet_count |
> status |
> +----+--------------------------------+----------------------+--------------------------------+
> | +I | 100 | 1 |
> PREMATCH |
> | -U | 100 | 1 |
> PREMATCH |
> | +U | 100 | 1 |
> CLOSE |
> | -U | 100 | 1 |
> CLOSE |
> | +U | 100 | 1 |
> ENDED |
> | -U | 100 | 1 |
> ENDED |
> | +U | 100 | 1 |
> ENDED-1 |
> | -U | 100 | 1 |
> ENDED-1 |
> | +U | 100 | 1 |
> ENDED-2 |
> | -U | 100 | 1 |
> ENDED-2 |
> | +U | 100 | 1 |
> ENDED-3 |
> +----+--------------------------------+----------------------+--------------------------------+
> {code}
> I would have expected to get the same results from the query with the left
> outer join (except the first entry when there is no status yet).
>
> Is there any reason for receiving these partial results with the left outer
> join query?
>
> I'd be happy to help should you need more details!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)