AMMJ93 opened a new issue, #32924:
URL: https://github.com/apache/beam/issues/32924
### What happened?
### Description
I have written an External Transform for the SolaceIO Read to make it
available for use in Python. When using the Solace IO connector via the
expansion service in a Python pipeline with Flink runner, the checkpoint state
size keeps growing in the cross-language boundary operator without there being
any throughput. This happens despite stable state sizes in the Java source
splits. The state eventually becomes too large and the pipeline crashes.
### Environment
- Apache Beam Python SDK 2.59.0
- Local Flink Runner 1.18.1 cluster
- Java Expansion Service for Solace IO 2.59.0
- RocksDB State Backend
### Pipeline Structure
1. Java side (Expansion Service):
- Multiple Solace source splits depending on number of connections ->
`PCollectionList<byte[]>`
- Each split maintains stable state (~10.8KB)
2. Cross-Language Boundary:
- State grows continuously
- Checkpoint metrics show increasing sizes (4.27MB -> 5.40MB) every
checkpoint
- Growth happens regardless of operation type (Flatten, Map, etc.)
### Metrics Evidence
Source splits (stable):
```
Source: Impulse -> [3]ReadFromSolace/Read(UnboundedSolaceSource)4/...
1/1 (100%) 2024-10-23 16:12:04 125ms 10.8 KB 10.8 KB 0 B (0 B)
Source: Impulse -> [3]ReadFromSolace/Read(UnboundedSolaceSource)3/...
1/1 (100%) 2024-10-23 16:12:05 260ms 10.8 KB 10.8 KB 0 B (0 B)
```
Cross-language operator (growing):
```
[4]{ReadFromSolace, Flatten}
1/1 (100%) 2024-10-23 16:32:38 445ms 4.27 MB 5.40 MB 0 B (0 B)
SubTasks:
End to End Duration: 445ms
Checkpointed Data Size: 4.27 MB
Full Checkpoint Data Size: 5.40 MB
Sync Duration: 364ms
Async Duration: 19ms
```
### Pipeline Options
```
"--runner=FlinkRunner",
"--flink_master=localhost:8081",
"--streaming",
"--flink_version=1.18",
"--setup_file=./setup.py",
"--checkpointing_interval=10000",
"--checkpointing_mode=EXACTLY_ONCE"
```
### Attempted Solutions
1. Added Reshuffle after cross-language boundary - No effect
2. Added windowing - No effect
3. Tried different state backend configurations - Still growing
### Expected Behavior
The state size should remain stable or have bounded growth, similar to the
Java source splits (~10.8KB each).
### Actual Behavior
The state size in the cross-language boundary operator keeps growing
(currently at 4.27MB checkpointed data size and 5.40MB full checkpoint data
size) and does not appear to have an upper bound.
### Additional Context
- This happens specifically in the operator responsible for moving data from
Java to Python
- The behavior is consistent regardless of the operation being performed at
the boundary
- The state growth appears to be related to the cross-language translation
mechanism rather than the specific operations
- Aside from this, the external transform works as intended, on checkpoint
the messages get acked
### Questions
1. Is this a known issue with cross-language transforms?
2. Are there any configurations or best practices for managing state in
cross-language scenarios?
3. Could this be related to how state is managed in the portable runner for
cross-language operations?
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]