Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4488#discussion_r131761767
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala
---
@@ -18,42 +18,54 @@
package org.apache.flink.table.runtime
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
import org.apache.flink.table.codegen.Compiler
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
/**
- * MapRunner with [[CRow]] output.
+ * ProcessRunner with [[CRow]] output.
*/
-class CRowOutputMapRunner(
+class CRowOutputProcessRunner(
name: String,
code: String,
@transient var returnType: TypeInformation[CRow])
- extends RichMapFunction[Any, CRow]
+ extends ProcessFunction[Any, CRow]
with ResultTypeQueryable[CRow]
- with Compiler[MapFunction[Any, Row]] {
+ with Compiler[ProcessFunction[Any, Row]] {
val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: MapFunction[Any, Row] = _
- private var outCRow: CRow = _
+ private var function: ProcessFunction[Any, Row] = _
+ private var cRowWrapper: CRowWrappingCollector = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name,
code)
LOG.debug("Instantiating MapFunction.")
function = clazz.newInstance()
- outCRow = new CRow(null, true)
+
+ this.cRowWrapper = new CRowWrappingCollector()
+ this.cRowWrapper.setChange(true)
}
- override def map(in: Any): CRow = {
- outCRow.row = function.map(in)
- outCRow
+ override def processElement(
+ in: Any,
+ ctx: ProcessFunction[Any, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ // remove timestamp from stream record
+ val tc = out.asInstanceOf[TimestampedCollector[_]]
--- End diff --
It is not strictly required but reduces the serialization overhead by one
Long value.
I added this to most functions that introduce a timestamp (ProcessFunction)
but would also be OK to remove it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---