lincoln-lil commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r757182840
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
Some(sink.copy(sinkTrait,
children.head).asInstanceOf[StreamPhysicalRel])
}
}
+
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink):
Seq[UpdateKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+ val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+ val sinkTrait = UpdateKindTrait.fromChangelogMode(
+ sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+ val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys
then it will fallback
+ // to beforeAndAfter mode for the correctness
+ var shouldFallback: Boolean = false
+ val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+ if (sinkDefinedPks.nonEmpty) {
+ val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+ val sinkPks =
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input is UA only, primary key != upsert key (upsert key can be
null) we should
+ // fallback to beforeAndAfter.
+ // Notice: even sink pk(s) contains input upsert key we cannot
optimize to UA only,
+ // this differs from batch job's unique key inference
+ if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
Review comment:
You're right, I overlooked it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]