[
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17829374#comment-17829374
]
Jacky Lau edited comment on FLINK-34702 at 3/21/24 4:32 AM:
------------------------------------------------------------
When adopting Solution 3 it was found to be unviable for two main reasons:
# The rule cannot be placed within the physical rewrite phase. This approach
is somewhat tricky, as noted in my poc. The reason is that the
MiniBatchAssigner operator is whether the proctime / eventime is related to
the downstream operators. The current Rank operator does not require
watermarks, whereas the Deduplicate operator for rowtime does. This can be
observed in the return value of requireWatermark. See the plan change in the
test {{{}testMiniBatchInferFirstRowOnRowtime{}}}.
# The RelTraitSet produced by the Deduplicate operator differs from that of
the Rank operator. Although we can modify the RelTrait of the deduplication
operator using the code below, there is no way to change the RelTraits of other
downstream operators belonging to the deduplication operator, unless we run the
logic of the FlinkChangelogModeInferenceProgram again. The local-global
two-phase rule can be dealt with within the physical rewrite phase because the
RelTraitSet of the local aggregate is same with the upstream node. See the plan
change in the test
testUpdatableRankWithDeduplicate
{code:java}
ModifyKindSetTrait modifyKindSetTrait;
UpdateKindTrait updateKindTrait;
if (!isLastRow && !isRowtime) {
// only proctime first row deduplicate does not produce UPDATE changes
modifyKindSetTrait = ModifyKindSetTrait.INSERT_ONLY();
updateKindTrait = UpdateKindTrait.NONE();
} else {
// other deduplicate produce update changes
modifyKindSetTrait = ModifyKindSetTrait.ALL_CHANGES();
updateKindTrait = rank.getTraitSet()
.getTrait(UpdateKindTraitDef.INSTANCE());
}
RelTraitSet duplicateTraitSet = rank.getTraitSet()
.replace(modifyKindSetTrait)
.replace(updateKindTrait); {code}
was (Author: jackylau):
When adopting Solution 3 it was found to be unviable for two main reasons:
# The rule cannot be placed within the physical rewrite phase. This approach
is somewhat tricky, as noted in my poc. The reason is that the
MiniBatchAssigner operator is whether the proctime / eventime is related to
the downstream operators. The current Rank operator does not require
watermarks, whereas the Deduplicate operator for rowtime does. This can be
observed in the return value of requireWatermark. See the plan change in the
test {{{}testMiniBatchInferFirstRowOnRowtime{}}}.
# The RelTraitSet produced by the Deduplicate operator differs from that of
the Rank operator. Although we can modify the RelTrait of the deduplication
operator using the code below, there is no way to change the RelTraits of other
downstream operators belonging to the deduplication operator, unless we un the
logic of the FlinkChangelogModeInferenceProgram again. The local-global
two-phase rule can be dealt with within the physical rewrite phase because the
RelTraitSet of the local aggregate is same with the upstream node. See the plan
change in the test
testUpdatableRankWithDeduplicate
{code:java}
ModifyKindSetTrait modifyKindSetTrait;
UpdateKindTrait updateKindTrait;
if (!isLastRow && !isRowtime) {
// only proctime first row deduplicate does not produce UPDATE changes
modifyKindSetTrait = ModifyKindSetTrait.INSERT_ONLY();
updateKindTrait = UpdateKindTrait.NONE();
} else {
// other deduplicate produce update changes
modifyKindSetTrait = ModifyKindSetTrait.ALL_CHANGES();
updateKindTrait = rank.getTraitSet()
.getTrait(UpdateKindTraitDef.INSTANCE());
}
RelTraitSet duplicateTraitSet = rank.getTraitSet()
.replace(modifyKindSetTrait)
.replace(updateKindTrait); {code}
> Rank should not convert to StreamExecDuplicate when the input is not insert
> only
> --------------------------------------------------------------------------------
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0
> Reporter: Jacky Lau
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
> val sqlQuery =
> """
> |SELECT *
> |FROM (
> | SELECT *,
> | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as
> rowNum
> | FROM (select a, count(b) as b from MyTable group by a)
> |)
> |WHERE rowNum = 1
> """.stripMargin
> util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't
> support consuming update changes which is produced by node
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in
> this case. and we can defer whether input contains update change in the
> "optimize the physical plan" phase.
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate
> can support consuming update changes , we can deprecate it
--
This message was sent by Atlassian Jira
(v8.20.10#820010)