Dawid Wysakowicz created FLINK-20306:
----------------------------------------
Summary: Accessing a versioned table as of time fails with a
cryptic message
Key: FLINK-20306
URL: https://issues.apache.org/jira/browse/FLINK-20306
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Dawid Wysakowicz
Fix For: 1.12.0
I tried running a query on a versioned table:
{code}
CREATE TABLE RatesHistory (
currency_time TIMESTAMP(3) METADATA FROM 'timestamp',
currency STRING,
rate DECIMAL(38, 10),
WATERMARK FOR currency_time AS currency_time -- defines the event time
) WITH (
'connector' = 'kafka',
'topic' = 'rates',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json' -- this is an append only
source
);
SELECT * from RatesHistory FOR SYSTEM_TIME AS OF TIMESTAMP '2020-11-11
13:12:13';
{code}
I understand that might not be supported now, but the exception I got is not
very helpful:
{code}
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough
rules to produce a node with desired properties: convention=STREAM_PHYSICAL,
FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0,
ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalSnapshot[convention: LOGICAL ->
STREAM_PHYSICAL]
There is 1 empty subset: rel#987:RelSubset#44.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE], the relevant part of the original plan is as follows
977:FlinkLogicalSnapshot(period=[2020-11-11 13:12:13])
975:FlinkLogicalCalc(subset=[rel#976:RelSubset#43.LOGICAL.any.None:
0.[NONE].[NONE]], select=[CAST(Reinterpret(CAST(timestamp))) AS currency_time,
currency, CAST(rate) AS rate])
962:FlinkLogicalTableSourceScan(subset=[rel#974:RelSubset#42.LOGICAL.any.None:
0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory,
watermark=[CAST($2):TIMESTAMP(3)]]], fields=[currency, rate, timestamp])
Root: rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
Original rel:
FlinkLogicalLegacySink(subset=[rel#117:RelSubset#4.LOGICAL.any.None:
0.[NONE].[NONE]],
name=[`default_catalog`.`default_database`.`_tmp_table_1885690557`],
fields=[currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost =
{1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 127
FlinkLogicalCalc(subset=[rel#126:RelSubset#3.LOGICAL.any.None:
0.[NONE].[NONE]], select=[CAST(currency_time) AS currency_time, currency,
CAST(rate) AS rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu,
0.0 io, 0.0 network, 0.0 memory}, id = 129
FlinkLogicalWatermarkAssigner(subset=[rel#124:RelSubset#2.LOGICAL.any.None:
0.[NONE].[NONE]], rowtime=[currency_time], watermark=[$0]): rowcount = 1.0E8,
cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id
= 123
FlinkLogicalCalc(subset=[rel#122:RelSubset#1.LOGICAL.any.None:
0.[NONE].[NONE]], select=[CAST(timestamp) AS currency_time, currency, rate]):
rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network,
0.0 memory}, id = 128
FlinkLogicalTableSourceScan(subset=[rel#120:RelSubset#0.LOGICAL.any.None:
0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory]],
fields=[currency, rate, timestamp]): rowcount = 1.0E8, cumulative cost = {1.0E8
rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 119
Sets:
Set#42, type: RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate,
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)
rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#962
rel#962:FlinkLogicalTableSourceScan.LOGICAL.any.None:
0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory,
watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp),
rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network,
0.0 memory}
rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE],
best=rel#983
rel#983:StreamExecTableSourceScan.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory,
watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp),
rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network,
0.0 memory}
Set#43, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647)
currency, DECIMAL(38, 18) rate)
rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#975
rel#975:FlinkLogicalCalc.LOGICAL.any.None:
0.[NONE].[NONE](input=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp)))
AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative
cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE],
best=rel#985
rel#985:StreamExecCalc.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp)))
AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative
cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
Set#44, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647)
currency, DECIMAL(38, 18) rate)
rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#977
rel#977:FlinkLogicalSnapshot.LOGICAL.any.None:
0.[NONE].[NONE](input=RelSubset#976,period=2020-11-11 13:12:13),
rowcount=1.0E8, cumulative cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network,
0.0 memory}
rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE],
best=null
Set#45, type: RecordType:peek_no_expand(BOOLEAN f0,
RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647)
currency, DECIMAL(38, 18) rate) f1)
rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#979
rel#979:FlinkLogicalLegacySink.LOGICAL.any.None:
0.[NONE].[NONE](input=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time,
currency, rate), rowcount=1.0E8, cumulative cost={4.0E8 rows, 3.0E8 cpu, 7.2E9
io, 0.0 network, 0.0 memory}
rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE],
best=null
rel#982:AbstractConverter.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None:
0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8,
cumulative cost={inf}
rel#988:StreamExecLegacySink.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time,
currency, rate), rowcount=1.0E8, cumulative cost={inf}
Graphviz:
digraph G {
root [style=filled,label="Root"];
subgraph cluster42{
label="Set 42 RecordType(VARCHAR(2147483647) currency,
DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)";
rel962
[label="rel#962:FlinkLogicalTableSourceScan\ntable=[default_catalog,
default_database, RatesHistory,
watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate,
timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
rel983
[label="rel#983:StreamExecTableSourceScan\ntable=[default_catalog,
default_database, RatesHistory,
watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate,
timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
subset974 [label="rel#974:RelSubset#42.LOGICAL.any.None:
0.[NONE].[NONE]"]
subset984
[label="rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
subgraph cluster43{
label="Set 43 RecordType(TIMESTAMP(3) currency_time,
VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)";
rel975
[label="rel#975:FlinkLogicalCalc\ninput=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp)))
AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows,
1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel985
[label="rel#985:StreamExecCalc\ninput=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp)))
AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows,
1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset976 [label="rel#976:RelSubset#43.LOGICAL.any.None:
0.[NONE].[NONE]"]
subset986
[label="rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
subgraph cluster44{
label="Set 44 RecordType(TIMESTAMP(3) currency_time,
VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)";
rel977
[label="rel#977:FlinkLogicalSnapshot\ninput=RelSubset#976,period=2020-11-11
13:12:13\nrows=1.0E8, cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0
memory}",color=blue,shape=box]
subset978 [label="rel#978:RelSubset#44.LOGICAL.any.None:
0.[NONE].[NONE]"]
subset987
[label="rel#987:RelSubset#44.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE]",color=red]
}
subgraph cluster45{
label="Set 45 RecordType:peek_no_expand(BOOLEAN f0,
RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647)
currency, DECIMAL(38, 18) rate) f1)";
rel979
[label="rel#979:FlinkLogicalLegacySink\ninput=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time,
currency, rate\nrows=1.0E8, cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0
network, 0.0 memory}",color=blue,shape=box]
rel982
[label="rel#982:AbstractConverter\ninput=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None:
0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8,
cost={inf}",shape=box]
rel988
[label="rel#988:StreamExecLegacySink\ninput=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time,
currency, rate\nrows=1.0E8, cost={inf}",shape=box]
subset980 [label="rel#980:RelSubset#45.LOGICAL.any.None:
0.[NONE].[NONE]"]
subset981
[label="rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
}
root -> subset981;
subset974 -> rel962[color=blue];
subset984 -> rel983[color=blue];
subset976 -> rel975[color=blue]; rel975 -> subset974[color=blue];
subset986 -> rel985[color=blue]; rel985 -> subset984[color=blue];
subset978 -> rel977[color=blue]; rel977 -> subset976[color=blue];
subset980 -> rel979[color=blue]; rel979 -> subset978[color=blue];
subset981 -> rel982; rel982 -> subset980;
subset981 -> rel988; rel988 -> subset987;
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)