[
https://issues.apache.org/jira/browse/FLINK-36139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vladislav Keda updated FLINK-36139:
-----------------------------------
Summary: ClassCastException when checkpointing AggregateFunction that
implements ResultTypeQueryable interface (was: ClassCastException when
checkpointing AggregateFunction)
> ClassCastException when checkpointing AggregateFunction that implements
> ResultTypeQueryable interface
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-36139
> URL: https://issues.apache.org/jira/browse/FLINK-36139
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.18.1
> Environment: Kubernetes Native Session Cluster
> Reporter: Vladislav Keda
> Priority: Major
>
> Let's consider simple example of AggregateFunction with custom Accumulator:
>
> {code:java}
> static class BatchingFunction implements AggregateFunction<Row,
> BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> {
> @Getter
> private final RowTypeInfo producedType;
>
> public BatchingFunction() {
> this.producedType = ...
> }
>
> @Override
> public Accumulator createAccumulator() {
> return new Accumulator();
> }
> @Override
> public Accumulator add(Row row, Accumulator acc) {
> acc.add(row);
> return acc;
> }
>
> @Override
> public Accumulator merge(Accumulator acc1, Accumulator acc2) {
> acc1.merge(acc2);
> return acc1;
> }
> @Override
> public Row getResult(Accumulator accumulator) {
> ...
> }
> private static class Accumulator implements Serializable {
>
> private final List<Row> rows = new ArrayList<>();
> List<Row> getAll() {
> return rows;
> }
> Accumulator merge(Accumulator acc2) {
> this.rows.addAll(acc2.rows);
> acc2.clear();
> return this;
> }
> void add(Row row) {
> rows.add(row);
> }
> void clear() {
> rows.clear();
> }
> }
> }
> {code}
> When resubmitting a job on a Flink Kubernetes Session cluster with aligned
> checkpoints that include this BatchingFunction, a ClassCastException is
> thrown in the JobManager logs:
>
> {code:java}
> org.apache.flink.util.SerializedThrowable:
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task
> checkpoint failed. at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]Caused by:
> org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not
> materialize checkpoint 1 for operator transform -> (Sink: metrics_transform,
> sink: Writer -> sink: Committer) (1/1)#0. at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
> ~[flink-dist-1.18.1.jar:1.18.1] ... 4 moreCaused by:
> org.apache.flink.util.SerializedThrowable:
> java.util.concurrent.ExecutionException: java.lang.ClassCastException: class
> ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
> cannot be cast to class org.apache.flink.types.Row
> (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
> is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
> @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')
> at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?] at
> java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?] at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist-1.18.1.jar:1.18.1] ... 3 moreCaused by:
> org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException:
> class
> ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
> cannot be cast to class org.apache.flink.types.Row
> (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
> is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
> @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist-1.18.1.jar:1.18.1] ... 3 more {code}
> In this case Flink uses incorrect serializer to write Accumulator objects to
> state. I would also like to note that this behavior is stochastic, since when
> I launch job first time on a new cluster such errors are not observed during
> checkpoints.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)