[ 
https://issues.apache.org/jira/browse/FLINK-32540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-32540.
----------------------------------
    Resolution: Invalid

These type of questions should be asked on the User mailing list, Stackoverflow 
or Slack.

> The issue of not distributing the last batch of data
> ----------------------------------------------------
>
>                 Key: FLINK-32540
>                 URL: https://issues.apache.org/jira/browse/FLINK-32540
>             Project: Flink
>          Issue Type: Bug
>         Environment: The above code was executed in IntelliJ IDEA, Flink 
> version 1.16, which also has this issue in 1.14. Other versions have not 
> attempted it
>  
>            Reporter: 原来你是小幸运001
>            Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I copied the source code of the flat map and wanted to implement my own flat 
> map. One of the logic is to issue the last piece of data at the end of the 
> Flink job, so I executed collector.collect in the close method, but the data 
> was not issued and the operator below cannot receive it.
> {code:java}
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.Utils;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.operators.TimestampedCollector;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> /**
>  * @author LaiYongBIn
>  * @date 2023/7/5 10:09
>  * @Description Do SomeThing
>  */
> public class Test {
>     public static void main(String[] args) throws Exception {
>         ParameterTool parameter = ParameterTool.fromArgs(args);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         DataStream<String> stream0 =
>                 env.addSource(new SourceFunction<String>() {
>                             @Override
>                             public void run(SourceContext<String> 
> sourceContext) throws Exception {
>                                 sourceContext.collect("TEST");
>                                 
> System.out.println("--------------------cancel--------------------");
>                             }
>                             @Override
>                             public void cancel() {
>                             }
>                         })
>                         .setParallelism(1);
>         MyFlatMapFun flatMapFunc = new MyFlatMapFun();
>         TypeInformation<String> outType = 
> TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), 
> stream0.getType(), Utils.getCallLocationName(), true);
>         DataStream<String> flatMap = stream0.transform("Flat Map", outType, 
> new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);
>         flatMap.flatMap(new FlatMapFunction<String, String>() {
>             @Override
>             public void flatMap(String s, Collector<String> collector) throws 
> Exception {
>                 
> System.out.println("----------------------------------------Obtain upstream 
> data is:" + s);
>             }
>         });
>         env.execute();
>     }
> }
> class MyStreamOperator extends AbstractUdfStreamOperator<String, 
> FlatMapFunction<String, String>> implements OneInputStreamOperator<String, 
> String> {
>     private transient TimestampedCollector<String> collector;
>     public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
>         super(userFunction);
>     }
>     @Override
>     public void open() throws Exception {
>         collector = new TimestampedCollector<>(output);
>     }
>     @Override
>     public void close() throws Exception {
>         // Distribute data during close
>         collector.collect("close message");
>     }
>     @Override
>     public void processElement(StreamRecord<String> streamRecord) throws 
> Exception {
>         // do nothing
>     }
> }
>  class MyFlatMapFun implements FlatMapFunction<String, String> {
>     @Override
>     public void flatMap(String s, Collector<String> collector) throws 
> Exception {
>         // do nothing
>     }
> } {code}
> Then I found out there was a finish method, and I tried to execute 
> 'collector. collect' in the finish method, and the data was successfully 
> distributed。
> {code:java}
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.Utils;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.operators.TimestampedCollector;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> /**
>  * @author LaiYongBIn
>  * @date 2023/7/5 10:09
>  * @Description Do SomeThing
>  */
> public class Test {
>     public static void main(String[] args) throws Exception {
>         ParameterTool parameter = ParameterTool.fromArgs(args);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         DataStream<String> stream0 =
>                 env.addSource(new SourceFunction<String>() {
>                             @Override
>                             public void run(SourceContext<String> 
> sourceContext) throws Exception {
>                                 sourceContext.collect("TEST");
>                                 
> System.out.println("--------------------cancel--------------------");
>                             }
>                             @Override
>                             public void cancel() {
>                             }
>                         })
>                         .setParallelism(1);
>         MyFlatMapFun flatMapFunc = new MyFlatMapFun();
>         TypeInformation<String> outType = 
> TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), 
> stream0.getType(), Utils.getCallLocationName(), true);
>         DataStream<String> flatMap = stream0.transform("Flat Map", outType, 
> new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);
>         flatMap.flatMap(new FlatMapFunction<String, String>() {
>             @Override
>             public void flatMap(String s, Collector<String> collector) throws 
> Exception {
>                 
> System.out.println("----------------------------------------Obtain upstream 
> data is:" + s);
>             }
>         });
>         env.execute();
>     }
> }
> class MyStreamOperator extends AbstractUdfStreamOperator<String, 
> FlatMapFunction<String, String>> implements OneInputStreamOperator<String, 
> String> {
>     private transient TimestampedCollector<String> collector;
>     public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
>         super(userFunction);
>     }
>     @Override
>     public void open() throws Exception {
>         collector = new TimestampedCollector<>(output);
>     }
>     @Override
>     public void close() throws Exception {
>     }
>     @Override
>     public void finish() throws Exception {
>         // Distribute data during finish
>         collector.collect("close message");
>     }
>     @Override
>     public void processElement(StreamRecord<String> streamRecord) throws 
> Exception {
>         // do nothing
>     }
> }
>  class MyFlatMapFun implements FlatMapFunction<String, String> {
>     @Override
>     public void flatMap(String s, Collector<String> collector) throws 
> Exception {
>         // do nothing
>     }
> } {code}
> But when I executed the program on Yarn, it was still not distributed. May I 
> know the reason for this and how to solve it.I hope that when the program is 
> executed on Yarn, the last batch of data can still be distributed to 
> downstream operators



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to