AMMJ93 opened a new issue, #32924:
URL: https://github.com/apache/beam/issues/32924

   ### What happened?
   
   ### Description
   I have written an External Transform for the SolaceIO Read to make it 
available for use in Python. When using the Solace IO connector via the 
expansion service in a Python pipeline with Flink runner, the checkpoint state 
size keeps growing in the cross-language boundary operator without there being 
any throughput. This happens despite stable state sizes in the Java source 
splits. The state eventually becomes too large and the pipeline crashes.
   
   ### Environment
   - Apache Beam Python SDK 2.59.0
   - Local Flink Runner 1.18.1 cluster
   - Java Expansion Service for Solace IO 2.59.0
   - RocksDB State Backend
   
   ### Pipeline Structure
   1. Java side (Expansion Service):
      - Multiple Solace source splits depending on number of connections -> 
`PCollectionList<byte[]>`
      - Each split maintains stable state (~10.8KB)
      
   2. Cross-Language Boundary:
      - State grows continuously
      - Checkpoint metrics show increasing sizes (4.27MB -> 5.40MB) every 
checkpoint
      - Growth happens regardless of operation type (Flatten, Map, etc.)
   
   ### Metrics Evidence
   Source splits (stable):
   ```
   Source: Impulse -> [3]ReadFromSolace/Read(UnboundedSolaceSource)4/...
   1/1 (100%) 2024-10-23 16:12:04 125ms 10.8 KB 10.8 KB 0 B (0 B)
   
   Source: Impulse -> [3]ReadFromSolace/Read(UnboundedSolaceSource)3/...
   1/1 (100%) 2024-10-23 16:12:05 260ms 10.8 KB 10.8 KB 0 B (0 B)
   ```
   
   Cross-language operator (growing):
   ```
   [4]{ReadFromSolace, Flatten}
   1/1 (100%) 2024-10-23 16:32:38 445ms 4.27 MB 5.40 MB 0 B (0 B)
   
   SubTasks:
   End to End Duration: 445ms
   Checkpointed Data Size: 4.27 MB
   Full Checkpoint Data Size: 5.40 MB
   Sync Duration: 364ms
   Async Duration: 19ms
   ```
   
   ### Pipeline Options
   ```
   "--runner=FlinkRunner",
   "--flink_master=localhost:8081",
   "--streaming",
   "--flink_version=1.18",
   "--setup_file=./setup.py",
   "--checkpointing_interval=10000",
   "--checkpointing_mode=EXACTLY_ONCE"
   ```
   
   ### Attempted Solutions
   1. Added Reshuffle after cross-language boundary - No effect
   2. Added windowing - No effect
   3. Tried different state backend configurations - Still growing
   
   ### Expected Behavior
   The state size should remain stable or have bounded growth, similar to the 
Java source splits (~10.8KB each).
   
   ### Actual Behavior
   The state size in the cross-language boundary operator keeps growing 
(currently at 4.27MB checkpointed data size and 5.40MB full checkpoint data 
size) and does not appear to have an upper bound.
   
   ### Additional Context
   - This happens specifically in the operator responsible for moving data from 
Java to Python
   - The behavior is consistent regardless of the operation being performed at 
the boundary
   - The state growth appears to be related to the cross-language translation 
mechanism rather than the specific operations
   - Aside from this, the external transform works as intended, on checkpoint 
the messages get acked
   
   ### Questions
   1. Is this a known issue with cross-language transforms?
   2. Are there any configurations or best practices for managing state in 
cross-language scenarios?
   3. Could this be related to how state is managed in the portable runner for 
cross-language operations?
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to