twalthr commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1213151859
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -1132,22 +1132,14 @@ private static List<TestSetSpec> numericBounds() {
.fromCase(DOUBLE(), -1.7976931348623157E308d,
Float.NEGATIVE_INFINITY)
.build(),
CastTestSpecBuilder.testCastTo(DECIMAL(38, 0))
- .fromCase(TINYINT(), Byte.MIN_VALUE - 1, new
BigDecimal(Byte.MIN_VALUE - 1))
- .fromCase(TINYINT(), Byte.MAX_VALUE + 1, new
BigDecimal(Byte.MAX_VALUE + 1))
- .fromCase(
- SMALLINT(),
- Short.MIN_VALUE - 1,
- new BigDecimal(Short.MIN_VALUE - 1))
- .fromCase(
- SMALLINT(),
- Short.MAX_VALUE + 1,
- new BigDecimal(Short.MAX_VALUE + 1))
- .fromCase(
- INT(), Integer.MIN_VALUE - 1, new
BigDecimal(Integer.MIN_VALUE - 1))
- .fromCase(
- INT(), Integer.MAX_VALUE + 1, new
BigDecimal(Integer.MAX_VALUE + 1))
- .fromCase(BIGINT(), Long.MIN_VALUE - 1, new
BigDecimal(Long.MIN_VALUE - 1))
- .fromCase(BIGINT(), Long.MAX_VALUE + 1, new
BigDecimal(Long.MAX_VALUE + 1))
+ .fromCase(TINYINT(), Byte.MIN_VALUE - 1, new
BigDecimal(Byte.MAX_VALUE))
Review Comment:
I feel these tests are also kind of pointless if the subtraction and
addition is not executed in SQL. If we would cast both input and expected
results to `byte` it would make more sense.
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml:
##########
@@ -107,14 +107,14 @@
Calc(select=[*org.apache.flink.table.planner.expressions.utils.Func24$$2da7dcb3c
<TestCase name="testNotIn">
<Resource name="ast">
<![CDATA[
-LogicalFilter(condition=[OR(SEARCH($1, Sarg[(-∞..1), (1..2), (2..3), (3..4),
(4..5), (5..6), (6..7), (7..8), (8..9), (9..10), (10..11), (11..12), (12..13),
(13..14), (14..15), (15..16), (16..17), (17..18), (18..19), (19..20), (20..21),
(21..22), (22..23), (23..24), (24..25), (25..26), (26..27), (27..28), (28..29),
(29..30), (30..+∞)]), SEARCH($2, Sarg[(-∞.._UTF-16LE'xx'),
(_UTF-16LE'xx'..+∞)]:CHAR(2) CHARACTER SET "UTF-16LE"))])
+LogicalFilter(condition=[OR(SEARCH($1, Sarg[(-∞..1), (1..2), (2..3), (3..4),
(4..5), (5..6), (6..7), (7..8), (8..9), (9..10), (10..11), (11..12), (12..13),
(13..14), (14..15), (15..16), (16..17), (17..18), (18..19), (19..20), (20..21),
(21..22), (22..23), (23..24), (24..25), (25..26), (26..27), (27..28), (28..29),
(29..30), (30..+∞)]), <>($2, _UTF-16LE'xx'))])
Review Comment:
Is CALCITE-5134 really the right issue for this diff? Doesn't look related.
But the plan makes sense.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala:
##########
@@ -96,7 +96,7 @@ class SubqueryCorrelateVariablesValidationTest extends
SubQueryTestBase {
util.verifyRelPlan(sqlQuery)
}
- @Test(expected = classOf[TableException])
+ @Test
Review Comment:
what was the exception before?
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml:
##########
@@ -99,4 +99,34 @@ LogicalProject(min_t3d=[$9], max_t2h=[$11])
]]>
</Resource>
</TestCase>
+ <TestCase name="testWithFilterJoinCorrelate">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1a
+FROM t1
+WHERE EXISTS (SELECT max(t2h) FROM t2
+ LEFT OUTER JOIN t1 ttt
+ ON t2.t2a=t1.t1a)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(t1a=[$0])
++- LogicalFilter(condition=[EXISTS({
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)])
+ LogicalProject(t2h=[$7])
+ LogicalJoin(condition=[=($0, $cor0.t1a)], joinType=[left])
+ LogicalTableScan(table=[[default_catalog, default_database, t2, source:
[TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+ LogicalTableScan(table=[[default_catalog, default_database, t1, source:
[TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t1, source:
[TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
Review Comment:
This looks incorrect to me. Did some rules don't fire correctly? Or is this
really the correct optimization?
##########
flink-table/flink-sql-jdbc-driver/pom.xml:
##########
@@ -84,6 +84,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
Review Comment:
Isn't this a problem of `FlinkSqlParserImpl`? Maybe the parser uses Guava
for the first time and we don't declare the dependencies correctly?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -469,10 +469,10 @@ private static List<TestSetSpec> allTypesBasic() {
.fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 539222987)
.fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123)
.fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123)
- .fromCase(FLOAT(), 9234567891.12, 644633299)
+ .fromCase(FLOAT(), 9234567891.12, 2147483647)
Review Comment:
Great that it got fixed in Calcite but this is a pretty significant change.
We need to properly communicate this via release notes.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -526,7 +526,7 @@ private static List<TestSetSpec> allTypesBasic() {
.fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT,
DEFAULT_NEGATIVE_BIGINT)
.fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
.fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123L)
- .fromCase(FLOAT(), 9234567891.12, 9234567891L)
+ .fromCase(FLOAT(), 9234567891.12, 9234568192L)
Review Comment:
can we change the FLOAT lines to `9234567891.12f` or
`Float.parseFloat("9234567891.12");` because the test uses doubles instead of
floats. This is confusing.
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml:
##########
@@ -379,30 +382,30 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], fi
<![CDATA[
LegacySink(name=[`default_catalog`.`default_database`.`retractSink1`],
fields=[a, total_c], changelogMode=[NONE])
+- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
- :- Calc(select=[a, total_c], where=[>(a, 50)], changelogMode=[I,UB,UA])
- : +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS
total_c], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
- : +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
- : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
- : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c],
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))],
changelogMode=[I])
+ :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS
total_c], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+ : +- Calc(select=[a, b, f0], where=[>(a, 50)], changelogMode=[I])
+ : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+ : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c,
'')], changelogMode=[I])
+ : +- Calc(select=[a, b, c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
Review Comment:
Two calc after each other?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -526,7 +526,7 @@ private static List<TestSetSpec> allTypesBasic() {
.fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT,
DEFAULT_NEGATIVE_BIGINT)
.fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
.fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123L)
- .fromCase(FLOAT(), 9234567891.12, 9234567891L)
+ .fromCase(FLOAT(), 9234567891.12, 9234568192L)
Review Comment:
These are pretty significant changes :(
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml:
##########
@@ -478,33 +483,37 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink3`], fields=[t
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Union(all=[true], union=[a, c])(reuse_id=[1])
-:- Calc(select=[a, c])
-: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Calc(select=[d, f])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])(reuse_id=[1])
+
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])(reuse_id=[2])
LegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, c])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[a, c])
+ :- Calc(select=[a, c])
+ : +- Reused(reference_id=[1])
+ +- Calc(select=[d, f])
Review Comment:
why this?
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml:
##########
@@ -379,30 +382,30 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], fi
<![CDATA[
LegacySink(name=[`default_catalog`.`default_database`.`retractSink1`],
fields=[a, total_c], changelogMode=[NONE])
+- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
- :- Calc(select=[a, total_c], where=[>(a, 50)], changelogMode=[I,UB,UA])
- : +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS
total_c], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
- : +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
- : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
- : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c],
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))],
changelogMode=[I])
+ :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS
total_c], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+ : +- Calc(select=[a, b, f0], where=[>(a, 50)], changelogMode=[I])
+ : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+ : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c,
'')], changelogMode=[I])
+ : +- Calc(select=[a, b, c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
- +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[>(a, 50)],
changelogMode=[I])
- +- Calc(select=[a, 0 AS total_c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+ +- Calc(select=[a, 0 AS total_c], where=[>(a, 50)], changelogMode=[I])
+ +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))],
changelogMode=[I])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c],
changelogMode=[I])
LegacySink(name=[`default_catalog`.`default_database`.`retractSink2`],
fields=[a, total_c], changelogMode=[NONE])
+- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
- :- Calc(select=[a, total_c], where=[<(a, 50)], changelogMode=[I,UB,UA])
- : +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS
total_c], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
- : +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
- : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
- : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c],
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))],
changelogMode=[I])
+ :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS
total_c], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+ : +- Calc(select=[a, b, f0], where=[<(a, 50)], changelogMode=[I])
+ : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+ : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c,
'')], changelogMode=[I])
+ : +- Calc(select=[a, b, c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
- +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[<(a, 50)],
changelogMode=[I])
- +- Calc(select=[a, 0 AS total_c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+ +- Calc(select=[a, 0 AS total_c], where=[<(a, 50)], changelogMode=[I])
+ +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))],
changelogMode=[I])
Review Comment:
shouldn't `select=[a, b, c]` be `select=[a]`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]