[
https://issues.apache.org/jira/browse/FLINK-32540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser closed FLINK-32540.
----------------------------------
Resolution: Invalid
These type of questions should be asked on the User mailing list, Stackoverflow
or Slack.
> The issue of not distributing the last batch of data
> ----------------------------------------------------
>
> Key: FLINK-32540
> URL: https://issues.apache.org/jira/browse/FLINK-32540
> Project: Flink
> Issue Type: Bug
> Environment: The above code was executed in IntelliJ IDEA, Flink
> version 1.16, which also has this issue in 1.14. Other versions have not
> attempted it
>
> Reporter: 原来你是小幸运001
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I copied the source code of the flat map and wanted to implement my own flat
> map. One of the logic is to issue the last piece of data at the end of the
> Flink job, so I executed collector.collect in the close method, but the data
> was not issued and the operator below cannot receive it.
> {code:java}
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.Utils;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.operators.TimestampedCollector;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> /**
> * @author LaiYongBIn
> * @date 2023/7/5 10:09
> * @Description Do SomeThing
> */
> public class Test {
> public static void main(String[] args) throws Exception {
> ParameterTool parameter = ParameterTool.fromArgs(args);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> stream0 =
> env.addSource(new SourceFunction<String>() {
> @Override
> public void run(SourceContext<String>
> sourceContext) throws Exception {
> sourceContext.collect("TEST");
>
> System.out.println("--------------------cancel--------------------");
> }
> @Override
> public void cancel() {
> }
> })
> .setParallelism(1);
> MyFlatMapFun flatMapFunc = new MyFlatMapFun();
> TypeInformation<String> outType =
> TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc),
> stream0.getType(), Utils.getCallLocationName(), true);
> DataStream<String> flatMap = stream0.transform("Flat Map", outType,
> new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);
> flatMap.flatMap(new FlatMapFunction<String, String>() {
> @Override
> public void flatMap(String s, Collector<String> collector) throws
> Exception {
>
> System.out.println("----------------------------------------Obtain upstream
> data is:" + s);
> }
> });
> env.execute();
> }
> }
> class MyStreamOperator extends AbstractUdfStreamOperator<String,
> FlatMapFunction<String, String>> implements OneInputStreamOperator<String,
> String> {
> private transient TimestampedCollector<String> collector;
> public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
> super(userFunction);
> }
> @Override
> public void open() throws Exception {
> collector = new TimestampedCollector<>(output);
> }
> @Override
> public void close() throws Exception {
> // Distribute data during close
> collector.collect("close message");
> }
> @Override
> public void processElement(StreamRecord<String> streamRecord) throws
> Exception {
> // do nothing
> }
> }
> class MyFlatMapFun implements FlatMapFunction<String, String> {
> @Override
> public void flatMap(String s, Collector<String> collector) throws
> Exception {
> // do nothing
> }
> } {code}
> Then I found out there was a finish method, and I tried to execute
> 'collector. collect' in the finish method, and the data was successfully
> distributed。
> {code:java}
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.Utils;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.operators.TimestampedCollector;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> /**
> * @author LaiYongBIn
> * @date 2023/7/5 10:09
> * @Description Do SomeThing
> */
> public class Test {
> public static void main(String[] args) throws Exception {
> ParameterTool parameter = ParameterTool.fromArgs(args);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> stream0 =
> env.addSource(new SourceFunction<String>() {
> @Override
> public void run(SourceContext<String>
> sourceContext) throws Exception {
> sourceContext.collect("TEST");
>
> System.out.println("--------------------cancel--------------------");
> }
> @Override
> public void cancel() {
> }
> })
> .setParallelism(1);
> MyFlatMapFun flatMapFunc = new MyFlatMapFun();
> TypeInformation<String> outType =
> TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc),
> stream0.getType(), Utils.getCallLocationName(), true);
> DataStream<String> flatMap = stream0.transform("Flat Map", outType,
> new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);
> flatMap.flatMap(new FlatMapFunction<String, String>() {
> @Override
> public void flatMap(String s, Collector<String> collector) throws
> Exception {
>
> System.out.println("----------------------------------------Obtain upstream
> data is:" + s);
> }
> });
> env.execute();
> }
> }
> class MyStreamOperator extends AbstractUdfStreamOperator<String,
> FlatMapFunction<String, String>> implements OneInputStreamOperator<String,
> String> {
> private transient TimestampedCollector<String> collector;
> public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
> super(userFunction);
> }
> @Override
> public void open() throws Exception {
> collector = new TimestampedCollector<>(output);
> }
> @Override
> public void close() throws Exception {
> }
> @Override
> public void finish() throws Exception {
> // Distribute data during finish
> collector.collect("close message");
> }
> @Override
> public void processElement(StreamRecord<String> streamRecord) throws
> Exception {
> // do nothing
> }
> }
> class MyFlatMapFun implements FlatMapFunction<String, String> {
> @Override
> public void flatMap(String s, Collector<String> collector) throws
> Exception {
> // do nothing
> }
> } {code}
> But when I executed the program on Yarn, it was still not distributed. May I
> know the reason for this and how to solve it.I hope that when the program is
> executed on Yarn, the last batch of data can still be distributed to
> downstream operators
--
This message was sent by Atlassian Jira
(v8.20.10#820010)