Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5327#discussion_r182445400
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala
---
@@ -16,35 +16,61 @@
* limitations under the License.
*/
-package org.apache.flink.table.runtime
+package org.apache.flink.table.runtime.join
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
/**
- * The collector to wrap a [[Row]] into a [[CRow]] and collect it
multiple times.
+ * The collector to wrap a [[Row]] into a [[CRow]] and collect it
multiple times. This collector
+ * can also used to count output record number and do lazy output.
*/
class CRowWrappingMultiOutputCollector() extends Collector[Row] {
private var out: Collector[CRow] = _
- private val outCRow: CRow = new CRow()
+ private val outCRow: CRow = new CRow(null, true)
+ // times for collect
private var times: Long = 0L
+ // count how many records have been emitted
+ private var emitCnt: Long = 0L
+ // don't collect to downstream if set lazyOutput to true
+ private var lazyOutput: Boolean = false
def setCollector(collector: Collector[CRow]): Unit = this.out = collector
def setChange(change: Boolean): Unit = this.outCRow.change = change
+ def setRow(row: Row): Unit = this.outCRow.row = row
+
+ def getRow(): Row = this.outCRow.row
+
def setTimes(times: Long): Unit = this.times = times
+ def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted
+
+ def getEmitCnt(): Long = emitCnt
+
+ def setLazyOutput(lazyOutput: Boolean): Unit = this.lazyOutput =
lazyOutput
+
override def collect(record: Row): Unit = {
outCRow.row = record
- var i: Long = 0L
- while (i < times) {
- out.collect(outCRow)
- i += 1
+ if (!lazyOutput) {
+ emitCnt += times
+ var i: Long = 0L
+ while (i < times) {
+ out.collect(outCRow)
+ i += 1
+ }
}
}
+ def reset(): Unit = {
+ this.outCRow.change = true
--- End diff --
Remove this line. The change must be set after every reset call anyway.
---