[
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiang Xin updated FLINK-31486:
------------------------------
Description:
We have the following code which uses CoGroup along with KeySelector in an
IterationBody. When we submit to Flink Session cluster, the exception raises.
{code:java}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getConfig().enableObjectReuse();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);
env.getCheckpointConfig().disableCheckpointing();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
int num = 400;
int types = num / 10;
Random rand = new Random(0);
long[] randoms = new long[types];
for (int i = 0; i < types; i++) {
randoms[i] = rand.nextInt(types);
}
SourceFunction<Row> rowGenerator =
new SourceFunction<Row>() {
@Override
public final void run(SourceContext<Row> ctx) throws Exception {
int cnt = 0;
while (cnt < num) {
ctx.collect(
Row.of(
randoms[cnt % (types)],
randoms[cnt % (types)],
new DenseVector(10)));
cnt++;
}
}
@Override
public void cancel() {}
};
Table trainDataTable =
tEnv.fromDataStream(
env.addSource(rowGenerator, "sourceOp-" + 1)
.returns(
Types.ROW(
Types.LONG,
Types.LONG,
DenseVectorTypeInfo.INSTANCE)));
testCoGroupWithIteration(tEnv, trainDataTable);
}
public static void testCoGroupWithIteration(StreamTableEnvironment tEnv, Table
trainDataTable)
throws Exception {
DataStream<Row> data1 = tEnv.toDataStream(trainDataTable);
DataStream<Row> data2 = tEnv.toDataStream(trainDataTable);
DataStreamList coResult =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(data1),
ReplayableDataStreamList.notReplay(data2),
IterationConfig.newBuilder().build(),
new TrainIterationBody());
List<Integer> counts =
IteratorUtils.toList(coResult.get(0).executeAndCollect());
System.out.println(counts.size());
}
private static class TrainIterationBody implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStreamList feedbackVariableStream =
IterationBody.forEachRound(
dataStreams,
input -> {
DataStream<Row> dataStream1 =
variableStreams.get(0);
DataStream<Row> dataStream2 = dataStreams.get(0);
DataStream<Row> coResult =
dataStream1
.coGroup(dataStream2)
.where(
(KeySelector<Row, Long>)
t2 ->
t2.getFieldAs(0))
.equalTo(
(KeySelector<Row, Long>)
t2 ->
t2.getFieldAs(1))
.window(EndOfStreamWindows.get())
.apply(
new
RichCoGroupFunction<Row, Row, Row>() {
@Override
public void coGroup(
Iterable<Row>
iterable,
Iterable<Row>
iterable1,
Collector<Row>
collector) {
for (Row row :
iterable1) {
collector.collect(row);
}
}
});
return DataStreamList.of(coResult);
});
DataStream<Integer> terminationCriteria =
feedbackVariableStream
.get(0)
.flatMap(new TerminateOnMaxIter(2))
.returns(Types.INT);
return new IterationBodyResult(
feedbackVariableStream, feedbackVariableStream,
terminationCriteria);
}
} {code}
The exception is as below.
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
not instantiate state partitioner. at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
at
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
at
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at
java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException:
cannot assign instance of java.lang.invoke.SerializedLambda to field
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
of type org.apache.flink.api.java.functions.KeySelector in instance of
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
... 17 more
was:
We have the following code which uses CoGroup along with KeySelector in an
IterationBody. When we submit to Flink Session cluster, the error raises.
When we use CoGroup along with KeySelector in an IterationBody, the following
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could
not instantiate state partitioner. at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
at
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
at
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
at
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
of type org.apache.flink.api.java.functions.KeySelector in instance of
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
... 17 more {code}
> Using KeySelector in IterationBody causes ClassNotFoundException
> ----------------------------------------------------------------
>
> Key: FLINK-31486
> URL: https://issues.apache.org/jira/browse/FLINK-31486
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Reporter: Jiang Xin
> Priority: Major
> Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> We have the following code which uses CoGroup along with KeySelector in an
> IterationBody. When we submit to Flink Session cluster, the exception raises.
> {code:java}
> public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> env.setStateBackend(new EmbeddedRocksDBStateBackend());
> env.getConfig().enableObjectReuse();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> env.getCheckpointConfig().disableCheckpointing();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> int num = 400;
> int types = num / 10;
> Random rand = new Random(0);
> long[] randoms = new long[types];
> for (int i = 0; i < types; i++) {
> randoms[i] = rand.nextInt(types);
> }
> SourceFunction<Row> rowGenerator =
> new SourceFunction<Row>() {
> @Override
> public final void run(SourceContext<Row> ctx) throws
> Exception {
> int cnt = 0;
> while (cnt < num) {
> ctx.collect(
> Row.of(
> randoms[cnt % (types)],
> randoms[cnt % (types)],
> new DenseVector(10)));
> cnt++;
> }
> }
> @Override
> public void cancel() {}
> };
> Table trainDataTable =
> tEnv.fromDataStream(
> env.addSource(rowGenerator, "sourceOp-" + 1)
> .returns(
> Types.ROW(
> Types.LONG,
> Types.LONG,
> DenseVectorTypeInfo.INSTANCE)));
> testCoGroupWithIteration(tEnv, trainDataTable);
> }
> public static void testCoGroupWithIteration(StreamTableEnvironment tEnv,
> Table trainDataTable)
> throws Exception {
> DataStream<Row> data1 = tEnv.toDataStream(trainDataTable);
> DataStream<Row> data2 = tEnv.toDataStream(trainDataTable);
> DataStreamList coResult =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(data1),
> ReplayableDataStreamList.notReplay(data2),
> IterationConfig.newBuilder().build(),
> new TrainIterationBody());
> List<Integer> counts =
> IteratorUtils.toList(coResult.get(0).executeAndCollect());
> System.out.println(counts.size());
> }
> private static class TrainIterationBody implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStreamList feedbackVariableStream =
> IterationBody.forEachRound(
> dataStreams,
> input -> {
> DataStream<Row> dataStream1 =
> variableStreams.get(0);
> DataStream<Row> dataStream2 = dataStreams.get(0);
> DataStream<Row> coResult =
> dataStream1
> .coGroup(dataStream2)
> .where(
> (KeySelector<Row, Long>)
> t2 ->
> t2.getFieldAs(0))
> .equalTo(
> (KeySelector<Row, Long>)
> t2 ->
> t2.getFieldAs(1))
> .window(EndOfStreamWindows.get())
> .apply(
> new
> RichCoGroupFunction<Row, Row, Row>() {
> @Override
> public void coGroup(
> Iterable<Row>
> iterable,
> Iterable<Row>
> iterable1,
>
> Collector<Row> collector) {
> for (Row row :
> iterable1) {
>
> collector.collect(row);
> }
> }
> });
> return DataStreamList.of(coResult);
> });
> DataStream<Integer> terminationCriteria =
> feedbackVariableStream
> .get(0)
> .flatMap(new TerminateOnMaxIter(2))
> .returns(Types.INT);
> return new IterationBodyResult(
> feedbackVariableStream, feedbackVariableStream,
> terminationCriteria);
> }
> } {code}
> The exception is as below.
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Could not instantiate state partitioner. at
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
> at
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
> at
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
> at
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
> at
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
> of type org.apache.flink.api.java.functions.KeySelector in instance of
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
> ... 17 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)