pnowojski commented on a change in pull request #6787: [FLINK-8577][table]
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253795843
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
##########
@@ -19,22 +19,138 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
import org.apache.flink.table.codegen.{FunctionCodeGenerator,
GeneratedFunction}
import org.apache.flink.table.plan.nodes.CommonScan
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner,
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
trait StreamScan extends CommonScan[CRow] with DataStreamRel {
+ protected def convertUpsertToInternalRow(
Review comment:
Ok, I talked also a little bit more with @twalthr and now I understand why
do we need this `switch/case`.
Regarding the code duplication, I'm not sure but maybe it could be
deduplicated by modifying already existing `convertToInternalRow` and adding
two more parameters:
1. `Row getRow(T inputRecord)`
2. `boolean getChange(T inputRecord)`
and using those two parameters:
```
new RichMapFunction[T, CRow] {
...
override def map(inputRecord: T): CRow = {
outCRow.row = getRow(inputRecord)
outCRow.change = getChange(inputRecord)
outCRow
}
}
```
and this modified `convertToInternalRow` method could be called three times:
1. for the current use case of append only data stream with `getRow(record)
: record`, `getChange(record) : true`
2. for upsert scala: with `getRow(record) : record._2`, `getChange(record) :
record._1`
3. for upsert java: with `getRow(record) : record.f1`, `getChange(record) :
record.f0`
or something along those lines.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services