wuchong commented on a change in pull request #13721:
URL: https://github.com/apache/flink/pull/13721#discussion_r511707227
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTableSourceScanRule.scala
##########
@@ -56,12 +66,57 @@ class StreamExecTableSourceScanRule
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
val traitSet: RelTraitSet =
rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-
- new StreamExecTableSourceScan(
+ val newScan = new StreamExecTableSourceScan(
rel.getCluster,
traitSet,
- scan.getTable.asInstanceOf[TableSourceTable]
- )
+ scan.getTable.asInstanceOf[TableSourceTable])
+
+ val table = scan.getTable.asInstanceOf[TableSourceTable]
+ val tableSource = table.tableSource.asInstanceOf[ScanTableSource]
+ val changelogMode = tableSource.getChangelogMode
+ if (changelogMode.contains(RowKind.UPDATE_AFTER) &&
+ !changelogMode.contains(RowKind.UPDATE_BEFORE)) {
+ // generate upsert materialize node for upsert source
+ val primaryKey = table.catalogTable.getSchema.getPrimaryKey
+ if (!primaryKey.isPresent) {
+ throw new TableException(s"Table
'${table.tableIdentifier.asSummaryString()}' produces" +
+ " a changelog stream contains UPDATE_AFTER but no UPDATE_BEFORE," +
+ " this requires to define primary key on the table.")
+ }
+ val keyFields = primaryKey.get().getColumns
+ val inputFieldNames = newScan.getRowType.getFieldNames
+ val primaryKeyIndices = getPrimaryKeyIndices(inputFieldNames, keyFields)
+ val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices,
requireStrict = true)
+ val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet()
+ .replace(requiredDistribution)
+ .replace(FlinkConventions.STREAM_PHYSICAL)
+ val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet)
+
+ new StreamExecUpsertMaterialize(
Review comment:
Why we have to add a Calc here? The transformed tree has the same output
row type with the original Scan node. If the primary key fields are never used
in the following nodes, there should already be a Calc after the Scan node. So
I think we shouldn't add a Calc.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]