This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0c33d3 [FLINK-24158][runtime] Remove useless collector in
LimitOperator
c0c33d3 is described below
commit c0c33d3f8dd39c312111de68d3c4230d89edbb10
Author: 庄天翼 <[email protected]>
AuthorDate: Sat Sep 4 21:50:04 2021 +0800
[FLINK-24158][runtime] Remove useless collector in LimitOperator
Signed-off-by: TennyZhuang <[email protected]>
This closes #17148 .
---
.../flink/table/runtime/operators/sort/LimitOperator.java | 11 +----------
1 file changed, 1 insertion(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
index 8baef28..791ff87 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
@@ -22,8 +22,6 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import org.apache.flink.table.runtime.util.StreamRecordCollector;
-import org.apache.flink.util.Collector;
/** Operator for batch limit. TODO support stopEarly. */
public class LimitOperator extends TableStreamOperator<RowData>
@@ -33,7 +31,6 @@ public class LimitOperator extends
TableStreamOperator<RowData>
private final long limitStart;
private final long limitEnd;
- private transient Collector<RowData> collector;
private transient int count = 0;
public LimitOperator(boolean isGlobal, long limitStart, long limitEnd) {
@@ -43,17 +40,11 @@ public class LimitOperator extends
TableStreamOperator<RowData>
}
@Override
- public void open() throws Exception {
- super.open();
- this.collector = new StreamRecordCollector<>(output);
- }
-
- @Override
public void processElement(StreamRecord<RowData> element) throws Exception
{
if (count < limitEnd) {
count++;
if (!isGlobal || count > limitStart) {
- collector.collect(element.getValue());
+ output.collect(element);
}
}
}