原来你是小幸运001 created FLINK-32540:
----------------------------------
Summary: The issue of not distributing the last batch of data
Key: FLINK-32540
URL: https://issues.apache.org/jira/browse/FLINK-32540
Project: Flink
Issue Type: Bug
Environment: The above code was executed in IntelliJ IDEA, Flink
version 1.16, which also has this issue in 1.14. Other versions have not
attempted it
Reporter: 原来你是小幸运001
I copied the source code of the flat map and wanted to implement my own flat
map. One of the logic is to issue the last piece of data at the end of the
Flink job, so I executed collector.collect in the close method, but the data
was not issued and the operator below cannot receive it.
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
/**
* @author LaiYongBIn
* @date 2023/7/5 10:09
* @Description Do SomeThing
*/
public class Test {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream0 =
env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String>
sourceContext) throws Exception {
sourceContext.collect("TEST");
System.out.println("--------------------cancel--------------------");
}
@Override
public void cancel() {
}
})
.setParallelism(1);
MyFlatMapFun flatMapFunc = new MyFlatMapFun();
TypeInformation<String> outType =
TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(),
Utils.getCallLocationName(), true);
DataStream<String> flatMap = stream0.transform("Flat Map", outType, new
MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);
flatMap.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws
Exception {
System.out.println("----------------------------------------Obtain upstream
data is:" + s);
}
});
env.execute();
}
}
class MyStreamOperator extends AbstractUdfStreamOperator<String,
FlatMapFunction<String, String>> implements OneInputStreamOperator<String,
String> {
private transient TimestampedCollector<String> collector;
public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
super(userFunction);
}
@Override
public void open() throws Exception {
collector = new TimestampedCollector<>(output);
}
@Override
public void close() throws Exception {
// Distribute data during close
collector.collect("close message");
}
@Override
public void processElement(StreamRecord<String> streamRecord) throws
Exception {
// do nothing
}
}
class MyFlatMapFun implements FlatMapFunction<String, String> {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
{
// do nothing
}
} {code}
Then I found out there was a finish method, and I tried to execute 'collector.
collect' in the finish method, and the data was successfully distributed。
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
/**
* @author LaiYongBIn
* @date 2023/7/5 10:09
* @Description Do SomeThing
*/
public class Test {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream0 =
env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String>
sourceContext) throws Exception {
sourceContext.collect("TEST");
System.out.println("--------------------cancel--------------------");
}
@Override
public void cancel() {
}
})
.setParallelism(1);
MyFlatMapFun flatMapFunc = new MyFlatMapFun();
TypeInformation<String> outType =
TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(),
Utils.getCallLocationName(), true);
DataStream<String> flatMap = stream0.transform("Flat Map", outType, new
MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);
flatMap.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws
Exception {
System.out.println("----------------------------------------Obtain upstream
data is:" + s);
}
});
env.execute();
}
}
class MyStreamOperator extends AbstractUdfStreamOperator<String,
FlatMapFunction<String, String>> implements OneInputStreamOperator<String,
String> {
private transient TimestampedCollector<String> collector;
public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
super(userFunction);
}
@Override
public void open() throws Exception {
collector = new TimestampedCollector<>(output);
}
@Override
public void close() throws Exception {
}
@Override
public void finish() throws Exception {
// Distribute data during finish
collector.collect("close message");
}
@Override
public void processElement(StreamRecord<String> streamRecord) throws
Exception {
// do nothing
}
}
class MyFlatMapFun implements FlatMapFunction<String, String> {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
{
// do nothing
}
} {code}
But when I executed the program on Yarn, it was still not distributed. May I
know the reason for this and how to solve it.I hope that when the program is
executed on Yarn, the last batch of data can still be distributed to downstream
operators
--
This message was sent by Atlassian Jira
(v8.20.10#820010)