pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237008338
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##########
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
     throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+    val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+    if (upsertStreamTable != null) {
+      val relTypes = scan.getRowType.getFieldList.map(_.getType)
+      val timeIndicatorIndexes = relTypes.zipWithIndex
+        .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+        .map(_._2)
+      val input = if (timeIndicatorIndexes.nonEmpty) {
+        // materialize time indicator
+        val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+        materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+      } else {
+        scan
+      }
+
+      LogicalLastRow.create(
 
 Review comment:
   I have revisited our previous discussion in the design/google doc and I 
still have the same concern as I had then:
   > In that case this is strange. It's like having 
filter/projection/aggregation nodes that after pruning/removing/pushing them 
down are sometimes NO-OPs and sometimes not. Besides being strange it can have 
some negative side effects:
   > - presence of a no-op node (when it doesn't need to be there) can 
block/mess with other optimisations or make them more complex
   > - it will pollute printed explain plan to the user. This is important, 
since this will be very costly node and it will be very hard to explain users 
when this node requires huge state and when not
   > - it would make cost computations more complicated
   
   Back then it sparked a discussion, where you were saying that it will never 
be a no-op and you wanted to `LastRow` always have a state and purpose besides 
upsert to retraction conversion (filtering out empty deletes). However that was 
resolved and filtering out empty deletes was dropped. Thus it brings me back to 
the issue of having this as a no-op node in the plan.
   
   This connects with your other response regarding naming `LastRow`
   > The LastRow node may be a no-op node in the case such as upsert source -> 
calc -> upsert sink. While LastRow will convert upsert stream to retract stream 
if a downstream node needs it to, such as upsert source -> calc -> retract 
sink. Whether convert to retract stream will be decided by RetractionRules.
   
   I would still argue that it should be named `UpsertToRetractionConverter` 
(or sth along those lines) and if not needed, it should be not in the plan. 
Maybe this means that our `RetractionRules` are not sufficient and needs some 
refactoring.
   
   I could see couple of solutions to that, but probably the best would be to 
deduce whether we need to insert `UpsertToRetractionConverter` or not inside 
the rule that is supposed to create it. Further optimisations/rewrites would 
have to correctly handle/preserve semantic/trait of upserts vs retractions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to