lafeier created FLINK-20801:
-------------------------------
Summary: Using asynchronous methods in operators causes
serialization problems
Key: FLINK-20801
URL: https://issues.apache.org/jira/browse/FLINK-20801
Project: Flink
Issue Type: Bug
Components: Runtime / Network
Affects Versions: 1.11.2
Environment: * flink 1.11.2
* java8
* windows
{color:#00875a}*测试代码*{color}
{code:java}
@Test
public void testConcurrentKryoException() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Person> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Person p = new Person("name" + i, i);
list.add(p);
}
DataStreamSource<Person> ds = env.fromCollection(list);
ds.flatMap(new FlatMapFunction<Person, Person>() {
@Override
public void flatMap(Person value, Collector<Person> out) throws
Exception {
CompletableFuture.supplyAsync(()->{
return value;
}).whenComplete((data,ex)->{
out.collect(data);
});
}
}).keyBy("name").flatMap(new FlatMapFunction<Person, Person>() {
@Override
public void flatMap(Person value, Collector<Person> out) throws
Exception {
}
});
env.execute("test");
}{code}
{code:java}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person {
String name;
int age;
}
{code}
Reporter: lafeier
Using asynchronous methods in operators causes serialization problems.
Exceptions are indeterminate, for example:
{code:java}
java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: Corrupt
stream, found tag: 21 at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
java.lang.Thread.run(Thread.java:748)
{code}
{code:java}
java.lang.RuntimeException: Cannot instantiate
class.java.lang.RuntimeException: Cannot instantiate class. at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
java.lang.Thread.run(Thread.java:748)Caused by:
java.lang.ClassNotFoundException: e11 name12
{code}
{code:java}
org.apache.flink.types.NullKeyFieldException: Unable to access field
java.lang.String cn.ubattery.Person.name on object
nullorg.apache.flink.types.NullKeyFieldException: Unable to access field
java.lang.String cn.ubattery.Person.name on object null at
org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181)
at
org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329)
at
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185)
at
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
java.lang.Thread.run(Thread.java:748)
{code}
It is only reproduced in the following order:
* flatMap (Multithreaded asynchronous operations are used in this method)
* keyBy ( Only the subsequent use of the Keyby operator will cause this
problem)
* flatMap
--
This message was sent by Atlassian Jira
(v8.3.4#803005)