This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0c33d3  [FLINK-24158][runtime] Remove useless collector in 
LimitOperator
c0c33d3 is described below

commit c0c33d3f8dd39c312111de68d3c4230d89edbb10
Author: 庄天翼 <[email protected]>
AuthorDate: Sat Sep 4 21:50:04 2021 +0800

    [FLINK-24158][runtime] Remove useless collector in LimitOperator
    
    Signed-off-by: TennyZhuang <[email protected]>
    
    This closes #17148 .
---
 .../flink/table/runtime/operators/sort/LimitOperator.java     | 11 +----------
 1 file changed, 1 insertion(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
index 8baef28..791ff87 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/LimitOperator.java
@@ -22,8 +22,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import org.apache.flink.table.runtime.util.StreamRecordCollector;
-import org.apache.flink.util.Collector;
 
 /** Operator for batch limit. TODO support stopEarly. */
 public class LimitOperator extends TableStreamOperator<RowData>
@@ -33,7 +31,6 @@ public class LimitOperator extends 
TableStreamOperator<RowData>
     private final long limitStart;
     private final long limitEnd;
 
-    private transient Collector<RowData> collector;
     private transient int count = 0;
 
     public LimitOperator(boolean isGlobal, long limitStart, long limitEnd) {
@@ -43,17 +40,11 @@ public class LimitOperator extends 
TableStreamOperator<RowData>
     }
 
     @Override
-    public void open() throws Exception {
-        super.open();
-        this.collector = new StreamRecordCollector<>(output);
-    }
-
-    @Override
     public void processElement(StreamRecord<RowData> element) throws Exception 
{
         if (count < limitEnd) {
             count++;
             if (!isGlobal || count > limitStart) {
-                collector.collect(element.getValue());
+                output.collect(element);
             }
         }
     }

Reply via email to