[ 
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xin updated FLINK-31486:
------------------------------
    Description: 
We have the following code which uses CoGroup along with KeySelector in an 
IterationBody. When we submit to Flink Session cluster, the exception raises.
{code:java}
public static void main(String[] args) throws Exception {
    Configuration config = new Configuration();
    config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L);
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
    env.setStateBackend(new EmbeddedRocksDBStateBackend());
    env.getConfig().enableObjectReuse();
    env.setRestartStrategy(RestartStrategies.noRestart());
    env.setParallelism(1);
    env.getCheckpointConfig().disableCheckpointing();

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    int num = 400;
    int types = num / 10;

    Random rand = new Random(0);
    long[] randoms = new long[types];
    for (int i = 0; i < types; i++) {
        randoms[i] = rand.nextInt(types);
    }

    SourceFunction<Row> rowGenerator =
            new SourceFunction<Row>() {
                @Override
                public final void run(SourceContext<Row> ctx) throws Exception {
                    int cnt = 0;
                    while (cnt < num) {
                        ctx.collect(
                                Row.of(
                                        randoms[cnt % (types)],
                                        randoms[cnt % (types)],
                                        new DenseVector(10)));
                        cnt++;
                    }
                }

                @Override
                public void cancel() {}
            };

    Table trainDataTable =
            tEnv.fromDataStream(
                    env.addSource(rowGenerator, "sourceOp-" + 1)
                            .returns(
                                    Types.ROW(
                                            Types.LONG,
                                            Types.LONG,
                                            DenseVectorTypeInfo.INSTANCE)));

    testCoGroupWithIteration(tEnv, trainDataTable);
}

public static void testCoGroupWithIteration(StreamTableEnvironment tEnv, Table 
trainDataTable)
        throws Exception {
    DataStream<Row> data1 = tEnv.toDataStream(trainDataTable);
    DataStream<Row> data2 = tEnv.toDataStream(trainDataTable);
    DataStreamList coResult =
            Iterations.iterateBoundedStreamsUntilTermination(
                    DataStreamList.of(data1),
                    ReplayableDataStreamList.notReplay(data2),
                    IterationConfig.newBuilder().build(),
                    new TrainIterationBody());

    List<Integer> counts = 
IteratorUtils.toList(coResult.get(0).executeAndCollect());
    System.out.println(counts.size());
}

private static class TrainIterationBody implements IterationBody {

    @Override
    public IterationBodyResult process(
            DataStreamList variableStreams, DataStreamList dataStreams) {

        DataStreamList feedbackVariableStream =
                IterationBody.forEachRound(
                        dataStreams,
                        input -> {
                            DataStream<Row> dataStream1 = 
variableStreams.get(0);
                            DataStream<Row> dataStream2 = dataStreams.get(0);

                            DataStream<Row> coResult =
                                    dataStream1
                                            .coGroup(dataStream2)
                                            .where(
                                                    (KeySelector<Row, Long>)
                                                            t2 -> 
t2.getFieldAs(0))
                                            .equalTo(
                                                    (KeySelector<Row, Long>)
                                                            t2 -> 
t2.getFieldAs(1))
                                            .window(EndOfStreamWindows.get())
                                            .apply(
                                                    new 
RichCoGroupFunction<Row, Row, Row>() {
                                                        @Override
                                                        public void coGroup(
                                                                Iterable<Row> 
iterable,
                                                                Iterable<Row> 
iterable1,
                                                                Collector<Row> 
collector) {
                                                            for (Row row : 
iterable1) {
                                                                
collector.collect(row);
                                                            }
                                                        }
                                                    });
                            return DataStreamList.of(coResult);
                        });

        DataStream<Integer> terminationCriteria =
                feedbackVariableStream
                        .get(0)
                        .flatMap(new TerminateOnMaxIter(2))
                        .returns(Types.INT);

        return new IterationBodyResult(
                feedbackVariableStream, feedbackVariableStream, 
terminationCriteria);
    }
} {code}
The exception is as below.

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate state partitioner. at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
 at 
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
 at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: 
cannot assign instance of java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
 at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
 ... 17 more 

  was:
We have the following code which uses CoGroup along with KeySelector in an 
IterationBody. When we submit to Flink Session cluster, the error raises.

 

When we use CoGroup along with KeySelector in an IterationBody, the following 
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate state partitioner. at    
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
 at 
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
 at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
 at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
 ... 17 more {code}


> Using KeySelector in IterationBody causes ClassNotFoundException
> ----------------------------------------------------------------
>
>                 Key: FLINK-31486
>                 URL: https://issues.apache.org/jira/browse/FLINK-31486
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>            Reporter: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.2.0
>
>
> We have the following code which uses CoGroup along with KeySelector in an 
> IterationBody. When we submit to Flink Session cluster, the exception raises.
> {code:java}
> public static void main(String[] args) throws Exception {
>     Configuration config = new Configuration();
>     config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L);
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(config);
>     env.setStateBackend(new EmbeddedRocksDBStateBackend());
>     env.getConfig().enableObjectReuse();
>     env.setRestartStrategy(RestartStrategies.noRestart());
>     env.setParallelism(1);
>     env.getCheckpointConfig().disableCheckpointing();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>     int num = 400;
>     int types = num / 10;
>     Random rand = new Random(0);
>     long[] randoms = new long[types];
>     for (int i = 0; i < types; i++) {
>         randoms[i] = rand.nextInt(types);
>     }
>     SourceFunction<Row> rowGenerator =
>             new SourceFunction<Row>() {
>                 @Override
>                 public final void run(SourceContext<Row> ctx) throws 
> Exception {
>                     int cnt = 0;
>                     while (cnt < num) {
>                         ctx.collect(
>                                 Row.of(
>                                         randoms[cnt % (types)],
>                                         randoms[cnt % (types)],
>                                         new DenseVector(10)));
>                         cnt++;
>                     }
>                 }
>                 @Override
>                 public void cancel() {}
>             };
>     Table trainDataTable =
>             tEnv.fromDataStream(
>                     env.addSource(rowGenerator, "sourceOp-" + 1)
>                             .returns(
>                                     Types.ROW(
>                                             Types.LONG,
>                                             Types.LONG,
>                                             DenseVectorTypeInfo.INSTANCE)));
>     testCoGroupWithIteration(tEnv, trainDataTable);
> }
> public static void testCoGroupWithIteration(StreamTableEnvironment tEnv, 
> Table trainDataTable)
>         throws Exception {
>     DataStream<Row> data1 = tEnv.toDataStream(trainDataTable);
>     DataStream<Row> data2 = tEnv.toDataStream(trainDataTable);
>     DataStreamList coResult =
>             Iterations.iterateBoundedStreamsUntilTermination(
>                     DataStreamList.of(data1),
>                     ReplayableDataStreamList.notReplay(data2),
>                     IterationConfig.newBuilder().build(),
>                     new TrainIterationBody());
>     List<Integer> counts = 
> IteratorUtils.toList(coResult.get(0).executeAndCollect());
>     System.out.println(counts.size());
> }
> private static class TrainIterationBody implements IterationBody {
>     @Override
>     public IterationBodyResult process(
>             DataStreamList variableStreams, DataStreamList dataStreams) {
>         DataStreamList feedbackVariableStream =
>                 IterationBody.forEachRound(
>                         dataStreams,
>                         input -> {
>                             DataStream<Row> dataStream1 = 
> variableStreams.get(0);
>                             DataStream<Row> dataStream2 = dataStreams.get(0);
>                             DataStream<Row> coResult =
>                                     dataStream1
>                                             .coGroup(dataStream2)
>                                             .where(
>                                                     (KeySelector<Row, Long>)
>                                                             t2 -> 
> t2.getFieldAs(0))
>                                             .equalTo(
>                                                     (KeySelector<Row, Long>)
>                                                             t2 -> 
> t2.getFieldAs(1))
>                                             .window(EndOfStreamWindows.get())
>                                             .apply(
>                                                     new 
> RichCoGroupFunction<Row, Row, Row>() {
>                                                         @Override
>                                                         public void coGroup(
>                                                                 Iterable<Row> 
> iterable,
>                                                                 Iterable<Row> 
> iterable1,
>                                                                 
> Collector<Row> collector) {
>                                                             for (Row row : 
> iterable1) {
>                                                                 
> collector.collect(row);
>                                                             }
>                                                         }
>                                                     });
>                             return DataStreamList.of(coResult);
>                         });
>         DataStream<Integer> terminationCriteria =
>                 feedbackVariableStream
>                         .get(0)
>                         .flatMap(new TerminateOnMaxIter(2))
>                         .returns(Types.INT);
>         return new IterationBodyResult(
>                 feedbackVariableStream, feedbackVariableStream, 
> terminationCriteria);
>     }
> } {code}
> The exception is as below.
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Could not instantiate state partitioner. at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
>  at 
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
>  at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector 
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
>  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) 
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) 
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>  at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
>  ... 17 more 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to