[ 
https://issues.apache.org/jira/browse/FLINK-20801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lafeier updated FLINK-20801:
----------------------------
    Description: 
Using asynchronous methods in operators causes serialization problems.

Exceptions are indeterminate, for example:
{code:java}
java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: Corrupt 
stream, found tag: 21 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:748)

{code}
 
{code:java}
java.lang.RuntimeException: Cannot instantiate 
class.java.lang.RuntimeException: Cannot instantiate class. at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:748)Caused by: 
java.lang.ClassNotFoundException: e11     name12     
{code}
{code:java}
org.apache.flink.types.NullKeyFieldException: Unable to access field 
java.lang.String cn.ubattery.Person.name on object 
nullorg.apache.flink.types.NullKeyFieldException: Unable to access field 
java.lang.String cn.ubattery.Person.name on object null at 
org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181)
 at 
org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329)
 at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185)
 at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:748)
{code}
It is only reproduced in the following order:
 * flatMap (Multithreaded asynchronous operations are used in this method)
 * keyBy   ( Only the subsequent use of the Keyby operator will cause this 
problem)
 * flatMap


 
 {color:#00875a}*测试代码*{color}

{code:java}
@Test
public void testConcurrentKryoException() throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    List<Person> list = new ArrayList<>();

    for (int i = 0; i < 100; i++) {
        Person p = new Person("name" + i, i);
        list.add(p);
    }

    DataStreamSource<Person> ds = env.fromCollection(list);

    ds.flatMap(new FlatMapFunction<Person, Person>() {
        @Override
        public void flatMap(Person value, Collector<Person> out) throws 
Exception {
            CompletableFuture.supplyAsync(()->{
                return value;
            }).whenComplete((data,ex)->{
                out.collect(data);
            });
        }
    }).keyBy("name").flatMap(new FlatMapFunction<Person, Person>() {
        @Override
        public void flatMap(Person value, Collector<Person> out) throws 
Exception {

        }
    });

    env.execute("test");
}{code}
{code:java}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person {
    String name;
    int age;
}
{code}


  was:
Using asynchronous methods in operators causes serialization problems.

Exceptions are indeterminate, for example:
{code:java}
java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: Corrupt 
stream, found tag: 21 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:748)

{code}
 
{code:java}
java.lang.RuntimeException: Cannot instantiate 
class.java.lang.RuntimeException: Cannot instantiate class. at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:748)Caused by: 
java.lang.ClassNotFoundException: e11     name12     
{code}
{code:java}
org.apache.flink.types.NullKeyFieldException: Unable to access field 
java.lang.String cn.ubattery.Person.name on object 
nullorg.apache.flink.types.NullKeyFieldException: Unable to access field 
java.lang.String cn.ubattery.Person.name on object null at 
org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181)
 at 
org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329)
 at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185)
 at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:748)
{code}
It is only reproduced in the following order:
 * flatMap (Multithreaded asynchronous operations are used in this method)
 * keyBy   ( Only the subsequent use of the Keyby operator will cause this 
problem)
 * flatMap




> Using asynchronous methods in operators causes serialization problems
> ---------------------------------------------------------------------
>
>                 Key: FLINK-20801
>                 URL: https://issues.apache.org/jira/browse/FLINK-20801
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.11.2
>         Environment: * flink 1.11.2
>  * java8
>  * windows
>            Reporter: lafeier
>            Priority: Major
>
> Using asynchronous methods in operators causes serialization problems.
> Exceptions are indeterminate, for example:
> {code:java}
> java.io.IOException: Corrupt stream, found tag: 21java.io.IOException: 
> Corrupt stream, found tag: 21 at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
>  at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>  at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
> java.lang.Thread.run(Thread.java:748)
> {code}
>  
> {code:java}
> java.lang.RuntimeException: Cannot instantiate 
> class.java.lang.RuntimeException: Cannot instantiate class. at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:385)
>  at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
>  at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>  at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.ClassNotFoundException: e11     name12     
> {code}
> {code:java}
> org.apache.flink.types.NullKeyFieldException: Unable to access field 
> java.lang.String cn.ubattery.Person.name on object 
> nullorg.apache.flink.types.NullKeyFieldException: Unable to access field 
> java.lang.String cn.ubattery.Person.name on object null at 
> org.apache.flink.api.java.typeutils.runtime.PojoComparator.accessField(PojoComparator.java:181)
>  at 
> org.apache.flink.api.java.typeutils.runtime.PojoComparator.extractKeys(PojoComparator.java:329)
>  at 
> org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:185)
>  at 
> org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:162)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:465)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:454)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:160)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
> java.lang.Thread.run(Thread.java:748)
> {code}
> It is only reproduced in the following order:
>  * flatMap (Multithreaded asynchronous operations are used in this method)
>  * keyBy   ( Only the subsequent use of the Keyby operator will cause this 
> problem)
>  * flatMap
>  
>  {color:#00875a}*测试代码*{color}
> {code:java}
> @Test
> public void testConcurrentKryoException() throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     List<Person> list = new ArrayList<>();
>     for (int i = 0; i < 100; i++) {
>         Person p = new Person("name" + i, i);
>         list.add(p);
>     }
>     DataStreamSource<Person> ds = env.fromCollection(list);
>     ds.flatMap(new FlatMapFunction<Person, Person>() {
>         @Override
>         public void flatMap(Person value, Collector<Person> out) throws 
> Exception {
>             CompletableFuture.supplyAsync(()->{
>                 return value;
>             }).whenComplete((data,ex)->{
>                 out.collect(data);
>             });
>         }
>     }).keyBy("name").flatMap(new FlatMapFunction<Person, Person>() {
>         @Override
>         public void flatMap(Person value, Collector<Person> out) throws 
> Exception {
>         }
>     });
>     env.execute("test");
> }{code}
> {code:java}
> @Data
> @AllArgsConstructor
> @NoArgsConstructor
> public class Person {
>     String name;
>     int age;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to