twalthr commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1213151859


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -1132,22 +1132,14 @@ private static List<TestSetSpec> numericBounds() {
                         .fromCase(DOUBLE(), -1.7976931348623157E308d, 
Float.NEGATIVE_INFINITY)
                         .build(),
                 CastTestSpecBuilder.testCastTo(DECIMAL(38, 0))
-                        .fromCase(TINYINT(), Byte.MIN_VALUE - 1, new 
BigDecimal(Byte.MIN_VALUE - 1))
-                        .fromCase(TINYINT(), Byte.MAX_VALUE + 1, new 
BigDecimal(Byte.MAX_VALUE + 1))
-                        .fromCase(
-                                SMALLINT(),
-                                Short.MIN_VALUE - 1,
-                                new BigDecimal(Short.MIN_VALUE - 1))
-                        .fromCase(
-                                SMALLINT(),
-                                Short.MAX_VALUE + 1,
-                                new BigDecimal(Short.MAX_VALUE + 1))
-                        .fromCase(
-                                INT(), Integer.MIN_VALUE - 1, new 
BigDecimal(Integer.MIN_VALUE - 1))
-                        .fromCase(
-                                INT(), Integer.MAX_VALUE + 1, new 
BigDecimal(Integer.MAX_VALUE + 1))
-                        .fromCase(BIGINT(), Long.MIN_VALUE - 1, new 
BigDecimal(Long.MIN_VALUE - 1))
-                        .fromCase(BIGINT(), Long.MAX_VALUE + 1, new 
BigDecimal(Long.MAX_VALUE + 1))
+                        .fromCase(TINYINT(), Byte.MIN_VALUE - 1, new 
BigDecimal(Byte.MAX_VALUE))

Review Comment:
   I feel these tests are also kind of pointless if the subtraction and 
addition is not executed in SQL. If we would cast both input and expected 
results to `byte` it would make more sense.



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CalcTest.xml:
##########
@@ -107,14 +107,14 @@ 
Calc(select=[*org.apache.flink.table.planner.expressions.utils.Func24$$2da7dcb3c
   <TestCase name="testNotIn">
     <Resource name="ast">
       <![CDATA[
-LogicalFilter(condition=[OR(SEARCH($1, Sarg[(-∞..1), (1..2), (2..3), (3..4), 
(4..5), (5..6), (6..7), (7..8), (8..9), (9..10), (10..11), (11..12), (12..13), 
(13..14), (14..15), (15..16), (16..17), (17..18), (18..19), (19..20), (20..21), 
(21..22), (22..23), (23..24), (24..25), (25..26), (26..27), (27..28), (28..29), 
(29..30), (30..+∞)]), SEARCH($2, Sarg[(-∞.._UTF-16LE'xx'), 
(_UTF-16LE'xx'..+∞)]:CHAR(2) CHARACTER SET "UTF-16LE"))])
+LogicalFilter(condition=[OR(SEARCH($1, Sarg[(-∞..1), (1..2), (2..3), (3..4), 
(4..5), (5..6), (6..7), (7..8), (8..9), (9..10), (10..11), (11..12), (12..13), 
(13..14), (14..15), (15..16), (16..17), (17..18), (18..19), (19..20), (20..21), 
(21..22), (22..23), (23..24), (24..25), (25..26), (26..27), (27..28), (28..29), 
(29..30), (30..+∞)]), <>($2, _UTF-16LE'xx'))])

Review Comment:
   Is CALCITE-5134 really the right issue for this diff? Doesn't look related. 
But the plan makes sense.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala:
##########
@@ -96,7 +96,7 @@ class SubqueryCorrelateVariablesValidationTest extends 
SubQueryTestBase {
     util.verifyRelPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test

Review Comment:
   what was the exception before?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml:
##########
@@ -99,4 +99,34 @@ LogicalProject(min_t3d=[$9], max_t2h=[$11])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWithFilterJoinCorrelate">
+       <Resource name="sql">
+                       <![CDATA[
+SELECT t1a
+FROM   t1
+WHERE  EXISTS (SELECT max(t2h) FROM t2
+               LEFT OUTER JOIN t1 ttt
+               ON t2.t2a=t1.t1a)
+      ]]>
+       </Resource>
+       <Resource name="ast">
+                       <![CDATA[
+LogicalProject(t1a=[$0])
++- LogicalFilter(condition=[EXISTS({
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)])
+  LogicalProject(t2h=[$7])
+    LogicalJoin(condition=[=($0, $cor0.t1a)], joinType=[left])
+      LogicalTableScan(table=[[default_catalog, default_database, t2, source: 
[TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+      LogicalTableScan(table=[[default_catalog, default_database, t1, source: 
[TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: 
[TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+       </Resource>
+       <Resource name="optimized rel plan">

Review Comment:
   This looks incorrect to me. Did some rules don't fire correctly? Or is this 
really the correct optimization?



##########
flink-table/flink-sql-jdbc-driver/pom.xml:
##########
@@ -84,6 +84,12 @@
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+                       <scope>provided</scope>
+               </dependency>

Review Comment:
   Isn't this a problem of `FlinkSqlParserImpl`? Maybe the parser uses Guava 
for the first time and we don't declare the dependencies correctly?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -469,10 +469,10 @@ private static List<TestSetSpec> allTypesBasic() {
                         .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 539222987)
                         .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123)
                         .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123)
-                        .fromCase(FLOAT(), 9234567891.12, 644633299)
+                        .fromCase(FLOAT(), 9234567891.12, 2147483647)

Review Comment:
   Great that it got fixed in Calcite but this is a pretty significant change. 
We need to properly communicate this via release notes.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -526,7 +526,7 @@ private static List<TestSetSpec> allTypesBasic() {
                         .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 
DEFAULT_NEGATIVE_BIGINT)
                         .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
                         .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123L)
-                        .fromCase(FLOAT(), 9234567891.12, 9234567891L)
+                        .fromCase(FLOAT(), 9234567891.12, 9234568192L)

Review Comment:
   can we change the FLOAT lines to `9234567891.12f` or 
`Float.parseFloat("9234567891.12");` because the test uses doubles instead of 
floats. This is confusing.



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml:
##########
@@ -379,30 +382,30 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], fi
       <![CDATA[
 LegacySink(name=[`default_catalog`.`default_database`.`retractSink1`], 
fields=[a, total_c], changelogMode=[NONE])
 +- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
-   :- Calc(select=[a, total_c], where=[>(a, 50)], changelogMode=[I,UB,UA])
-   :  +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
-   :     +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS 
total_c], changelogMode=[I,UB,UA])
-   :        +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
-   :           +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
-   :              +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
-   :                 +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], 
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))], 
changelogMode=[I])
+   :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+   :  +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS 
total_c], changelogMode=[I,UB,UA])
+   :     +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+   :        +- Calc(select=[a, b, f0], where=[>(a, 50)], changelogMode=[I])
+   :           +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+   :              +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c, 
'')], changelogMode=[I])
+   :                 +- Calc(select=[a, b, c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])

Review Comment:
   Two calc after each other?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java:
##########
@@ -526,7 +526,7 @@ private static List<TestSetSpec> allTypesBasic() {
                         .fromCase(BIGINT(), DEFAULT_NEGATIVE_BIGINT, 
DEFAULT_NEGATIVE_BIGINT)
                         .fromCase(FLOAT(), DEFAULT_POSITIVE_FLOAT, 123L)
                         .fromCase(FLOAT(), DEFAULT_NEGATIVE_FLOAT, -123L)
-                        .fromCase(FLOAT(), 9234567891.12, 9234567891L)
+                        .fromCase(FLOAT(), 9234567891.12, 9234568192L)

Review Comment:
   These are pretty significant changes :(



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml:
##########
@@ -478,33 +483,37 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink3`], fields=[t
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Union(all=[true], union=[a, c])(reuse_id=[1])
-:- Calc(select=[a, c])
-:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Calc(select=[d, f])
-   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])(reuse_id=[1])
+
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])(reuse_id=[2])
 
 LegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, c])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[a, c])
+   :- Calc(select=[a, c])
+   :  +- Reused(reference_id=[1])
+   +- Calc(select=[d, f])

Review Comment:
   why this?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml:
##########
@@ -379,30 +382,30 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], fi
       <![CDATA[
 LegacySink(name=[`default_catalog`.`default_database`.`retractSink1`], 
fields=[a, total_c], changelogMode=[NONE])
 +- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
-   :- Calc(select=[a, total_c], where=[>(a, 50)], changelogMode=[I,UB,UA])
-   :  +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
-   :     +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS 
total_c], changelogMode=[I,UB,UA])
-   :        +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
-   :           +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
-   :              +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
-   :                 +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], 
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))], 
changelogMode=[I])
+   :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+   :  +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS 
total_c], changelogMode=[I,UB,UA])
+   :     +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+   :        +- Calc(select=[a, b, f0], where=[>(a, 50)], changelogMode=[I])
+   :           +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+   :              +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c, 
'')], changelogMode=[I])
+   :                 +- Calc(select=[a, b, c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
    :                    +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-   +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[>(a, 50)], 
changelogMode=[I])
-      +- Calc(select=[a, 0 AS total_c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+   +- Calc(select=[a, 0 AS total_c], where=[>(a, 50)], changelogMode=[I])
+      +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))], 
changelogMode=[I])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], 
changelogMode=[I])
 
 LegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], 
fields=[a, total_c], changelogMode=[NONE])
 +- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
-   :- Calc(select=[a, total_c], where=[<(a, 50)], changelogMode=[I,UB,UA])
-   :  +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
-   :     +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS 
total_c], changelogMode=[I,UB,UA])
-   :        +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
-   :           +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
-   :              +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
-   :                 +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], 
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))], 
changelogMode=[I])
+   :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+   :  +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS 
total_c], changelogMode=[I,UB,UA])
+   :     +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+   :        +- Calc(select=[a, b, f0], where=[<(a, 50)], changelogMode=[I])
+   :           +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+   :              +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c, 
'')], changelogMode=[I])
+   :                 +- Calc(select=[a, b, c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
    :                    +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-   +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[<(a, 50)], 
changelogMode=[I])
-      +- Calc(select=[a, 0 AS total_c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+   +- Calc(select=[a, 0 AS total_c], where=[<(a, 50)], changelogMode=[I])
+      +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))], 
changelogMode=[I])

Review Comment:
   shouldn't `select=[a, b, c]` be `select=[a]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to