Hi Stephen, Thanks for reporting the issue! David, good catch!
I think we have to resort to only using a single state cell for buffering on checkpoints, instead of using a new one for every checkpoint. I was under the assumption that, if the state cell was cleared, it would not be checkpointed but that does not seem to be the case. Thanks, Max On 21.04.20 09:29, David Morávek wrote: > Hi Stephen, > > nice catch and awesome report! ;) This definitely needs a proper fix. > I've created a new JIRA to track the issue and will try to resolve it > soon as this seems critical to me. > > https://issues.apache.org/jira/browse/BEAM-9794 > > Thanks, > D. > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <[email protected] > <mailto:[email protected]>> wrote: > > I was able to reproduce this in a unit test: > > @Test > > *public* *void* test() *throws* InterruptedException, > ExecutionException { > > FlinkPipelineOptions options = > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*); > > options.setCheckpointingInterval(10L); > > options.setParallelism(1); > > options.setStreaming(*true*); > > options.setRunner(FlinkRunner.*class*); > > options.setFlinkMaster("[local]"); > > options.setStateBackend(*new* > MemoryStateBackend(Integer.*/MAX_VALUE/*)); > > Pipeline pipeline = Pipeline./create/(options); > > pipeline > > .apply(Create./of/((Void) *null*)) > > .apply( > > ParDo./of/( > > *new* DoFn<Void, Void>() { > > > *private* *static* *final* *long* > */serialVersionUID/* = 1L; > > > @RequiresStableInput > > @ProcessElement > > *public* *void* processElement() {} > > })); > > pipeline.run(); > > } > > > It took a while to get to checkpoint 32,767, but eventually it did, > and it failed with the same error I listed above. > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel > <[email protected] <mailto:[email protected]>> wrote: > > I have a Beam Pipeline (2.14) running on Flink (1.8.0, > emr-5.26.0) that uses the RequiresStableInput feature. > > Currently it's configured to checkpoint once a minute, and after > around 32000-33000 checkpoints, it fails with: > > 2020-04-15 13:15:02,920 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 32701 @ 1586956502911 for job > 9953424f21e240112dd23ab4f8320b60. > 2020-04-15 13:15:05,762 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 32701 for job > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms). > 2020-04-15 13:16:02,919 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 32702 @ 1586956562911 for job > 9953424f21e240112dd23ab4f8320b60. > 2020-04-15 13:16:03,147 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - <operator_name> (1/2) > (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to > FAILED. > AsynchronousException{java.lang.Exception: Could not > materialize checkpoint 32702 for operator <operator_name> > (1/2).} > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize > checkpoint 32702 for operator <operator_name> (1/2). > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) > at > > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: java.lang.IllegalArgumentException > at > > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > at > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) > ... 7 more > > > The exception comes from > here: > https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 > > In the Flink Runner code, I can see that each checkpoint will > result in a new OperatorState (or KeyedState if the stream is > keyed): > > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103 > > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143 > > This seems to be the reason the pipeline will eventually die. > > While a workaround might be to increase the time between > checkpoints, it seems like any pipeline running on flink, using > the RequiresStableInput is limited in the amount of time that it > can run without being started from scratch. >
