lincoln-lil commented on a change in pull request #17699:
URL: https://github.com/apache/flink/pull/17699#discussion_r754006462
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
Some(sink.copy(sinkTrait,
children.head).asInstanceOf[StreamPhysicalRel])
}
}
+
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink):
Seq[UpdateKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+ val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+ val sinkTrait = UpdateKindTrait.fromChangelogMode(
+ sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+ val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys
then it will fallback
+ // to beforeAndAfter mode for the correctness
+ var shouldFallback: Boolean = false
+ val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+ if (sinkDefinedPks.nonEmpty) {
+ val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+ val sinkPks =
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input is UA only, primary key != upsert key (upsert key can be
null) we should
+ // fallback to beforeAndAfter.
+ // Notice: even sink pk(s) contains input upsert key we cannot
optimize to UA only,
+ // this differs from batch job's unique key inference
+ if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
Review comment:
This should be reserved because the metadata query may return a empty
`changeLogUpsertKeys` set.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
Some(sink.copy(sinkTrait,
children.head).asInstanceOf[StreamPhysicalRel])
}
}
+
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink):
Seq[UpdateKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+ val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+ val sinkTrait = UpdateKindTrait.fromChangelogMode(
+ sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+ val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys
then it will fallback
+ // to beforeAndAfter mode for the correctness
+ var shouldFallback: Boolean = false
+ val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+
.getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+ if (sinkDefinedPks.nonEmpty) {
+ val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+ val sinkPks =
ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input is UA only, primary key != upsert key (upsert key can be
null) we should
+ // fallback to beforeAndAfter.
+ // Notice: even sink pk(s) contains input upsert key we cannot
optimize to UA only,
+ // this differs from batch job's unique key inference
+ if (changeLogUpsertKeys == null || changeLogUpsertKeys.size() == 0
+ || !changeLogUpsertKeys.exists {0 == _.compareTo(sinkPks)}) {
+ shouldFallback = true
+ }
+ }
+ if (shouldFallback) {
+ Seq(beforeAndAfter)
+ } else {
+ Seq(onlyAfter, beforeAndAfter)
+ }
+ } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+ Seq(beforeAndAfter)
+ } else {
+ Seq(UpdateKindTrait.NONE)
+ }
+ sinkRequiredTraits
+ }
+
+ private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink):
Boolean = {
Review comment:
Initially I put the two methods together, but seems a little bit
complex, and the two methods do the different things indeed, so I change to the
current version.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
Some(sink.copy(sinkTrait,
children.head).asInstanceOf[StreamPhysicalRel])
}
}
+
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink):
Seq[UpdateKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+ val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+ val sinkTrait = UpdateKindTrait.fromChangelogMode(
+ sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+ val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys
then it will fallback
+ // to beforeAndAfter mode for the correctness
+ var shouldFallback: Boolean = false
+ val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
Review comment:
Good point! I simply moved the code from `StreamPhysicalSink` to here,
`getPrimaryKeyIndexes` is more simpler.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -759,6 +754,84 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
Some(sink.copy(sinkTrait,
children.head).asInstanceOf[StreamPhysicalRel])
}
}
+
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink):
Seq[UpdateKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+ val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+ val sinkTrait = UpdateKindTrait.fromChangelogMode(
+ sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+ val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys
then it will fallback
+ // to beforeAndAfter mode for the correctness
+ var shouldFallback: Boolean = false
Review comment:
yes, it's more clearly.
##########
File path: docs/layouts/shortcodes/generated/execution_config_configuration.html
##########
@@ -58,6 +58,12 @@
<td><p>Enum</p></td>
<td>The NOT NULL column constraint on a table enforces that null
values can't be inserted into the table. Flink supports 'error' (default) and
'drop' enforcement behavior. By default, Flink will check values and throw
runtime exception when null values writing into NOT NULL columns. Users can
change the behavior to 'drop' to silently drop such records without throwing
exception.<br /><br />Possible
values:<ul><li>"ERROR"</li><li>"DROP"</li></ul></td>
</tr>
+ <tr>
+ <td><h5>table.exec.sink.pk-shuffle</h5><br> <span class="label
label-primary">Streaming</span></td>
Review comment:
Okay, I thought the target issue is the same one. I'll create a separate
pr.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]