Hi Stephen,

nice catch and awesome report! ;) This definitely needs a proper fix. I've
created a new JIRA to track the issue and will try to resolve it soon as
this seems critical to me.

https://issues.apache.org/jira/browse/BEAM-9794

Thanks,
D.

On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <[email protected]>
wrote:

> I was able to reproduce this in a unit test:
>
> @Test
>>
>>   *public* *void* test() *throws* InterruptedException,
>> ExecutionException {
>>
>>     FlinkPipelineOptions options = PipelineOptionsFactory.*as*
>> (FlinkPipelineOptions.*class*);
>>
>>     options.setCheckpointingInterval(10L);
>>
>>     options.setParallelism(1);
>>
>>     options.setStreaming(*true*);
>>
>>     options.setRunner(FlinkRunner.*class*);
>>
>>     options.setFlinkMaster("[local]");
>>
>>     options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE*
>> ));
>>
>>     Pipeline pipeline = Pipeline.*create*(options);
>>
>>     pipeline
>>
>>         .apply(Create.*of*((Void) *null*))
>>
>>         .apply(
>>
>>             ParDo.*of*(
>>
>>                 *new* DoFn<Void, Void>() {
>>
>>
>>                   *private* *static* *final* *long* *serialVersionUID* =
>> 1L;
>>
>>
>>                   @RequiresStableInput
>>
>>                   @ProcessElement
>>
>>                   *public* *void* processElement() {}
>>
>>                 }));
>>
>>     pipeline.run();
>>
>>   }
>>
>
> It took a while to get to checkpoint 32,767, but eventually it did, and it
> failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel <[email protected]>
> wrote:
>
>> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that
>> uses the RequiresStableInput feature.
>>
>> Currently it's configured to checkpoint once a minute, and after around
>> 32000-33000 checkpoints, it fails with:
>>
>>> 2020-04-15 13:15:02,920 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:15:05,762 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>> in 2667 ms).
>>> 2020-04-15 13:16:02,919 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:16:03,147 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 32702 for operator <operator_name> (1/2).}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>> for operator <operator_name> (1/2).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.IllegalArgumentException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>> ... 5 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>>> ... 7 more
>>
>>
>> The exception comes from here:
>> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>>
>> In the Flink Runner code, I can see that each checkpoint will result in a
>> new OperatorState (or KeyedState if the stream is keyed):
>>
>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>>
>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>>
>> This seems to be the reason the pipeline will eventually die.
>>
>> While a workaround might be to increase the time between checkpoints, it
>> seems like any pipeline running on flink, using the RequiresStableInput is
>> limited in the amount of time that it can run without being started from
>> scratch.
>>
>>

Reply via email to