LadyForest commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1446923256
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml:
##########
@@ -16,6 +16,35 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testDuplicateJoinStateTtlHint">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d'), STATE_TTL('T1'
= '1d', 'T2' = '8d') */* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 =
T3.b3]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($3, $5))])
+ +- LogicalJoin(condition=[true], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T2=2d, T3=3d}][STATE_TTL
inheritPath:[0, 0] options:{T1=1d, T2=8d}]]])
+ :- LogicalJoin(condition=[true], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0, 0] options:{T2=2d,
T3=3d}][STATE_TTL inheritPath:[0, 0, 0] options:{T1=1d, T2=8d}]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3]],
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Join(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{RIGHT=3 d}]]])
+:- Exchange(distribution=[hash[b2]])
+: +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{RIGHT=2 d, LEFT=1 d}]]])
Review Comment:
Nit: I think it better to align the format like `RIGHT=2d, LEFT=1d`
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -147,7 +150,20 @@ class RelTreeWriterImpl(
}
val stateTtlHints =
FlinkHints.getAllStateTtlHints(rel.asInstanceOf[Hintable].getHints)
if (stateTtlHints.nonEmpty) {
- printValues.add(Pair.of("stateTtlHints",
RelExplainUtil.hintsToString(stateTtlHints)))
+ // do not merge the state ttl hints for the original plan because
we can observe
+ // the processing of state ttl hints in the AST stage
+ val isNeedMerge =
Review Comment:
Can we move this logic to `QueryHintResolver`? I think we don't need to
merge twice here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]