[ 
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xin updated FLINK-31486:
------------------------------
    Description: 
We have the following code which uses CoGroup along with KeySelector in an 
IterationBody. When we submit to Flink Session cluster, the error raises.

 

When we use CoGroup along with KeySelector in an IterationBody, the following 
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate state partitioner. at    
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
 at 
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
 at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
 at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
 ... 17 more {code}

  was:
When we use CoGroup along with KeySelector in an IterationBody, the following 
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate state partitioner. at    
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
 at 
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
 at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
 at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
 ... 17 more {code}


> Using KeySelector in IterationBody causes ClassNotFoundException
> ----------------------------------------------------------------
>
>                 Key: FLINK-31486
>                 URL: https://issues.apache.org/jira/browse/FLINK-31486
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>            Reporter: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.2.0
>
>
> We have the following code which uses CoGroup along with KeySelector in an 
> IterationBody. When we submit to Flink Session cluster, the error raises.
>  
> When we use CoGroup along with KeySelector in an IterationBody, the following 
> exception occurs.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Could not instantiate state partitioner. at    
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
>  at 
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
>  at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
> java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector 
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
>  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) 
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) 
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>  at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
>  ... 17 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to