garcia created FLINK-35218:
------------------------------
Summary: Duplicated values caused by expired state TTL
Key: FLINK-35218
URL: https://issues.apache.org/jira/browse/FLINK-35218
Project: Flink
Issue Type: Bug
Reporter: garcia
Attachments: image-2024-04-23-15-34-32-860.png
Hi,
We utilize the state TTL to clean our Flink input tables through the
`table.exec.state.ttl` configuration.
However, we encountered an issue when the TTL expires, as illustrated in our
scenario:
Given this input_table
{code:java}
{
"$schema": "http://json-schema.org/draft-07/schema",
"$id": "http://example.com/example.json",
"type": "object",
"title": "bet placed schema",
"required": [
"placement_date"
],
"properties": {
"bet_id": {
"$id": "#/properties/bet_id",
"type": "string"
},
"regulator": {
"$id": "#/properties/regulator",
"type": "string"
},
"match_id": {
"$id": "#/properties/match_id",
"type": "integer"
},
"combo_id": {
"$id": "#/properties/combo_id",
"type": "integer"
},
"is_live": {
"$id": "#/properties/is_live",
"type": "boolean"
},
"offer_catalog": {
"$id": "#/properties/offer_catalog",
"type": "string"
},
"combo_selection_nbr": {
"$id": "#/properties/combo_selection_nbr",
"type": "integer"
}
},
"additionalProperties": true
} {code}
This configuration:
{code:java}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment
=> JavaStreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}val streamEnv = new
StreamExecutionEnvironment(JavaStreamExecutionEnvironment.getExecutionEnvironment(conf))val
tableEnv = StreamTableEnvironment.create(env)
tableEnv.getConfig.getConfiguration
.setString("table.local-time-zone", "UTC")
tableEnv.getConfig.getConfiguration
.setString("table.exec.mini-batch.enabled", "true")
tableEnv.getConfig.getConfiguration
.setString("table.exec.mini-batch.allow-latency", "5 s")
tableEnv.getConfig.getConfiguration
.setString("table.exec.mini-batch.size", "5000")
tableEnv.getConfig.getConfiguration
.setString("table.exec.state.ttl", TimeUnit.MILLISECONDS.convert(1,
TimeUnit.MINUTES).toString) {code}
And this query (simplified query):
{code:java}
WITH exploded_combos AS (
select
event_timestamp,
CAST(JSON_VALUE(combos.combo, '$.match_id') AS BIGINT) as match_id,
CAST(
JSON_VALUE(combos.combo, '$.combo_selection_id') AS BIGINT
) as combo_id,
CAST(JSON_VALUE(combos.combo, '$.is_live') AS BOOLEAN) as is_live,
CAST(RegulatorToCatalog(regulator) AS VARCHAR) as offer_catalog,
CARDINALITY(
JsonToArray(JSON_QUERY(combos.combo, '$.bet_selections'))
) as combo_selections_nbr,
combo_bet_selections
from
bet_placed_view
CROSS JOIN UNNEST(JsonToArray(combo_bet_selections)) AS combos(combo)
),
agg_match AS (
SELECT
match_id,
LOWER(offer_catalog) as offer_catalog,
MAX(event_timestamp) AS last_event_time_utc,
COUNT(*) AS bet_count
FROM
exploded_combos
WHERE
match_id IS NOT NULL
AND combo_id IS NOT NULL
AND offer_catalog IS NOT NULL
AND combo_bet_selections IS NOT NULL
GROUP BY
match_id,
LOWER(offer_catalog)
),
agg_combo AS (
SELECT
match_id,
combo_id,
combo_selections_nbr,
is_live,
LOWER(offer_catalog) AS offer_catalog,
MAX(event_timestamp) AS last_event_time_utc,
COUNT(*) as bet_count
FROM
exploded_combos
WHERE
match_id IS NOT NULL
AND combo_id IS NOT NULL
AND (
combo_selections_nbr = 3
OR combo_selections_nbr = 2
)
AND offer_catalog IS NOT NULL
GROUP BY
match_id,
combo_id,
combo_selections_nbr,
is_live,
LOWER(offer_catalog)
),
ranked_filtered_agg_combo_main_page AS (
SELECT
match_id,
combo_id,
offer_catalog,
bet_count,
ROW_NUMBER() OVER (
PARTITION BY match_id,
offer_catalog
ORDER BY
bet_count DESC,
combo_id DESC
) AS rank_combo
FROM
agg_combo
WHERE
combo_selections_nbr = 3
),
joined_filtered_agg_match_main_page AS (
SELECT
ranked_filtered_agg_combo_main_page.match_id,
ranked_filtered_agg_combo_main_page.offer_catalog,
ranked_filtered_agg_combo_main_page.bet_count,
ranked_filtered_agg_combo_main_page.combo_id,
ROW_NUMBER() OVER (
PARTITION BY agg_match.offer_catalog
ORDER BY
agg_match.bet_count DESC,
agg_match.match_id DESC
) AS rank_match
FROM
agg_match
INNER JOIN ranked_filtered_agg_combo_main_page ON
ranked_filtered_agg_combo_main_page.match_id = agg_match.match_id
AND ranked_filtered_agg_combo_main_page.offer_catalog =
agg_match.offer_catalog
WHERE
ranked_filtered_agg_combo_main_page.rank_combo = 1
)
SELECT
partition_key,
match_id,
offer_catalog,
false as live,
LAST_VALUE(last_event_utc) AS last_event_utc,
LAST_VALUE(last_event_utc) AS max_last_event_utc,
LAST_VALUE(top) AS topFROM (
SELECT
'<all>' as match_id,
offer_catalog,
'<all>' || '#openmatch#' || offer_catalog as partition_key,
CAST('MAX(very_last_event_time_utc)' AS VARCHAR) AS last_event_utc,
'[' || LISTAGG(
'{"match_id":' || CAST(match_id AS VARCHAR) || ',"rank":' ||
CAST(rank_match AS VARCHAR) || ',"count":' || CAST(bet_count AS VARCHAR) ||
',"combo_id":"' || CAST(combo_id AS VARCHAR) || '","offer_catalog":"' ||
CAST(offer_catalog AS VARCHAR) || '","live": ' || '}'
) || ']' AS top
FROM
joined_filtered_agg_match_main_page
WHERE
rank_match <= 5
GROUP BY
offer_catalog
)
GROUP BY
partition_key,
match_id,
offer_catalog{code}
As you can see in the result below, when the TTL is reached, we have duplicate
values in our output.
{code:java}
+I[<all>#openmatch#betclic_fr, <all>, betclic_fr,
false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc),
[
{
"match_id": 2,
"rank": 1,
"count": 2,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 5,
"rank": 2,
"count": 1,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
}
]
]
+U[<all>#openmatch#betclic_fr, <all>, betclic_fr,
false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc),
[
{
"match_id": 5,
"rank": 2,
"count": 1,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 2,
"rank": 1,
"count": 3,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 3,
"rank": 3,
"count": 2,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 1,
"rank": 4,
"count": 3,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 4,
"rank": 5,
"count": 1,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
}
]
]
...
// DUPLICATED VALUES (multiple times the same rank):
+U[<all>#openmatch#betclic_fr, <all>, betclic_fr,
false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc),
[
{
"match_id": 3,
"rank": 1,
"count": 16,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 5,
"rank": 3,
"count": 14,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 2,
"rank": 2,
"count": 15,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 4,
"rank": 4,
"count": 14,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 1,
"rank": 5,
"count": 12,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 2,
"rank": 2,
"count": 15,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 1,
"rank": 5,
"count": 12,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 5,
"rank": 3,
"count": 16,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 3,
"rank": 1,
"count": 18,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
},
{
"match_id": 4,
"rank": 4,
"count": 15,
"combo_id": "123",
"offer_catalog": "betclic_fr",
"live":
}
]
]
... {code}
*Is this a bug or did we misunderstand how to implement the ttl?*
Flink is also sending to this warning:
!image-2024-04-23-15-34-32-860.png!
We actually found 2 workaround:
* set a verry large ttl (1year) This will eliminate most of the problems thanks
to our business logic
* Use sliding window (but if we do that we need window of 50days sliding each
minute)
We're afraid that these two solutions will be very expensive
Can you tell us if this is a normal behavior or if we miss understand something
about the TTL?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)