nybbles opened a new issue, #27648:
URL: https://github.com/apache/beam/issues/27648

   ### What happened?
   
   Please see https://gist.github.com/nybbles/6e1f2ab31866b251ff754e22b71f8405 
for code to replicate this problem.
   
   # Problem description
   I am finding that using unbounded SDFs with Flink results in checkpoint 
sizes that grow without bound, which eventually results in the job failing and 
all further job submissions to fail, even for 
`beam.transforms.periodicsequence.PeriodicImpulse` in a very simple pipeline, 
given below.   
   
   My pipeline consists of an SDF that reads from an unbounded source, which 
means that when there are no new messages, the SDF must poll the unbounded 
source, with some timeout. I observed that when my SDF would do this polling 
behavior (using `tracker.defer_remainder` as described in 
https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint,
 the checkpoint size would grow.
   
   This happens even if the unbounded source was empty, and hence my SDF simply 
executed a loop of polling the unbounded source and then calling 
`tracker.defer_remainder` and returning from the `DoFn` to relinquish control 
and wait to poll again.
   
   I was concerned that I had implemented my SDF or my pipeline incorrectly, so 
I found `beam.transforms.periodicsequence.PeriodicImpulse` and tested it in a 
very simple pipeline, which is as follows (note that `apply_windowing`'s value 
does not change the problematic behavior):
   ```python
       with beam.Pipeline(options=runner_options) as pipeline:
           pcoll = pipeline | PeriodicImpulse(
               fire_interval=5, apply_windowing=True
           )
           pcoll | beam.Map(print)
   ```
   This pipeline also results in growing checkpoint size.
   
   The Flink cluster configuration and full source for the program to replicate 
the problem and the Docker compose to get the Flink cluster up and running are 
given below in the reproduction steps and in 
https://gist.github.com/nybbles/6e1f2ab31866b251ff754e22b71f8405.
   
   In case it is helpful, I'll list the `FLINK_PROPERTIES` and 
`PipelineOptions` below.
   
   ```yaml
         FLINK_PROPERTIES: &flink_properties |-
           historyserver.archive.clean-expired-jobs: true
           state.backend: hashmap
           state.checkpoints.dir: file:///tmp/beam_state/
           state.checkpoints.num-retained: 10
           jobmanager.rpc.address: host.docker.internal
           rest.address: host.docker.internal
           taskmanager.numberOfTaskSlots: 4
           taskmanager.memory.process.size: 2048m
   ```
   
   ```python
       runner_options = PortableOptions(
           artifact_endpoint=f"{JOB_SERVER_HOSTNAME}:8098",
           environment_cache_millis=0,
           environment_config="apache/beam_python3.11_sdk:2.48.0",
           environment_options=None,
           environment_type="DOCKER",
           job_endpoint=f"{JOB_SERVER_HOSTNAME}:8099",
           job_server_timeout=60,
           runner = "PortableRunner",
           output_executable_path=None,
           sdk_worker_parallelism=0,
           state_backend = "filesystem",
           state_backend_storage_path = "file:///tmp/beam_state/",
           streaming = True,
           checkpointing_interval = STREAMING_CHECKPOINTING_INTERVAL,
           parallelism = 1,
           auto_watermark_interval = 500,
       )
   ```
   
   See this email thread for more context: 
https://lists.apache.org/thread/7yjr1f24rdzwzofdty1h12w9m28o62sm.
   
   ## Note on priority
   I followed the linked guide for setting issue priorities and set this one to 
priority 1 because it seems like unbounded SDFs is an important component, 
running on Flink is an important usecase, and having arbitrary checkpoint size 
growth makes unbounded SDFs on Flink non-functional. My apologies in advance if 
this is the wrong priority level.
   
   # Reproduction steps
   1. Run a Flink cluster (i.e. the Gist above provides my Docker Compose-based 
Flink cluster. It uses a taskmanager image that can run Docker containers, 
needed for using `environment_type="DOCKER"`).
   2. Run the attached driver program in the Gist above to create the job and 
submit it to the Flink cluster.
   3. Observe in the logs that the checkpoints grow in size, specifically the 
step with the SDF, (and in proportion to the number of calls to 
`tracker.defer_remainder`), despite the SDF not actually explicitly 
accumulating any state.
   4. The checkpoint size grows until the Java heap space is exhausted and the 
job is killed. Now when the job is resubmitted, it will always fail, because 
the job manager attempts to restore the job from the large checkpoint, 
resulting in Java heap space being exhausted again.
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to