Hi Stephen, nice catch and awesome report! ;) This definitely needs a proper fix. I've created a new JIRA to track the issue and will try to resolve it soon as this seems critical to me.
https://issues.apache.org/jira/browse/BEAM-9794 Thanks, D. On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <[email protected]> wrote: > I was able to reproduce this in a unit test: > > @Test >> >> *public* *void* test() *throws* InterruptedException, >> ExecutionException { >> >> FlinkPipelineOptions options = PipelineOptionsFactory.*as* >> (FlinkPipelineOptions.*class*); >> >> options.setCheckpointingInterval(10L); >> >> options.setParallelism(1); >> >> options.setStreaming(*true*); >> >> options.setRunner(FlinkRunner.*class*); >> >> options.setFlinkMaster("[local]"); >> >> options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE* >> )); >> >> Pipeline pipeline = Pipeline.*create*(options); >> >> pipeline >> >> .apply(Create.*of*((Void) *null*)) >> >> .apply( >> >> ParDo.*of*( >> >> *new* DoFn<Void, Void>() { >> >> >> *private* *static* *final* *long* *serialVersionUID* = >> 1L; >> >> >> @RequiresStableInput >> >> @ProcessElement >> >> *public* *void* processElement() {} >> >> })); >> >> pipeline.run(); >> >> } >> > > It took a while to get to checkpoint 32,767, but eventually it did, and it > failed with the same error I listed above. > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel <[email protected]> > wrote: > >> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that >> uses the RequiresStableInput feature. >> >> Currently it's configured to checkpoint once a minute, and after around >> 32000-33000 checkpoints, it fails with: >> >>> 2020-04-15 13:15:02,920 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60. >>> 2020-04-15 13:15:05,762 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes >>> in 2667 ms). >>> 2020-04-15 13:16:02,919 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60. >>> 2020-04-15 13:16:03,147 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph - >>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from >>> RUNNING to FAILED. >>> AsynchronousException{java.lang.Exception: Could not materialize >>> checkpoint 32702 for operator <operator_name> (1/2).} >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702 >>> for operator <operator_name> (1/2). >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) >>> ... 6 more >>> Caused by: java.util.concurrent.ExecutionException: >>> java.lang.IllegalArgumentException >>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) >>> at >>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) >>> ... 5 more >>> Caused by: java.lang.IllegalArgumentException >>> at >>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) >>> at >>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) >>> at >>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) >>> at >>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) >>> at >>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) >>> ... 7 more >> >> >> The exception comes from here: >> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 >> >> In the Flink Runner code, I can see that each checkpoint will result in a >> new OperatorState (or KeyedState if the stream is keyed): >> >> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103 >> >> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143 >> >> This seems to be the reason the pipeline will eventually die. >> >> While a workaround might be to increase the time between checkpoints, it >> seems like any pipeline running on flink, using the RequiresStableInput is >> limited in the amount of time that it can run without being started from >> scratch. >> >>
