[
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiang Xin updated FLINK-31486:
------------------------------
Description:
We have the following code which uses CoGroup along with KeySelector in an
IterationBody. When we submit to Flink Session cluster, the error raises.
When we use CoGroup along with KeySelector in an IterationBody, the following
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
not instantiate state partitioner. at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
at
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
at
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
of type org.apache.flink.api.java.functions.KeySelector in instance of
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
... 17 more {code}
was:
When we use CoGroup along with KeySelector in an IterationBody, the following
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
not instantiate state partitioner. at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
at
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
at
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
of type org.apache.flink.api.java.functions.KeySelector in instance of
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
... 17 more {code}
> Using KeySelector in IterationBody causes ClassNotFoundException
> ----------------------------------------------------------------
>
> Key: FLINK-31486
> URL: https://issues.apache.org/jira/browse/FLINK-31486
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Reporter: Jiang Xin
> Priority: Major
> Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> We have the following code which uses CoGroup along with KeySelector in an
> IterationBody. When we submit to Flink Session cluster, the error raises.
>
> When we use CoGroup along with KeySelector in an IterationBody, the following
> exception occurs.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Could not instantiate state partitioner. at
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
> at
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
> at
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
> at
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
> at
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at
> java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
> of type org.apache.flink.api.java.functions.KeySelector in instance of
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
> ... 17 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)