amontoli opened a new issue, #25709:
URL: https://github.com/apache/beam/issues/25709
### What happened?
I tried to run a pipeline on a Flink cluster, using the PortableRunner. The
pipeline contains a dummy DoFn, which simply prints the element it has to
process, and a dummy Stateful DoFn, similar to the non-stateful one, but with a
ReadModifyWriteStateSpec variable defined. I am using Beam 2.45.0. Here is the
sample code:
```
import logging
from typing import Tuple
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
logging.basicConfig(level=logging.INFO)
class DummyDoFn(beam.DoFn):
def process(self, element):
logging.info(f"Input to DummyDoFn: " + str(element))
yield element
class DummyReadModifyWriteStateDoFn(beam.DoFn):
VALUES =
beam.transforms.userstate.ReadModifyWriteStateSpec(name="values",
coder=beam.coders.PickleCoder())
def process(self, element: Tuple[int, int], values =
beam.DoFn.StateParam(VALUES)):
logging.info(f"Input to DummyReadModifyWriteStateDoFn: " +
str(element))
yield element
def run():
common_options = ['--job_name=Test stateful DoFn', '--streaming']
flink = True
if flink:
runner_options = [
'--runner=PortableRunner',
'--job_endpoint=localhost:8099',
'--artifact_endpoint=localhost:8098',
'--environment_type=EXTERNAL',
'--environment_config=localhost:50000',
]
else:
runner_options = ["--runner=DirectRunner"]
options = common_options + runner_options
beam_options = PipelineOptions(flags = options)
with beam.Pipeline(options = beam_options) as pipeline:
(pipeline | beam.Create([i for i in range(10)])
| beam.Map(lambda x: (1,x))
| "DummyDoFn1" >> beam.ParDo(DummyDoFn())
| "DummyDoFn2" >>
beam.ParDo(DummyReadModifyWriteStateDoFn())
| beam.Map(lambda x: logging.info("Final output" +
str(x))))
if __name__ == "__main__":
run()
```
If I run this pipeline on the Flink cluster (`flink = True`), the output is:
```
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 0)
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 1)
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 2)
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 3)
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 4)
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 5)
2023-03-03 11:12:12,314 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 6)
2023-03-03 11:12:12,315 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 7)
2023-03-03 11:12:12,315 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 8)
2023-03-03 11:12:12,315 INFO pipeline.py:18
[] - Input to DummyDoFn: (1, 9)
2023-03-03 11:12:12,320 INFO
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] -
getProcessBundleDescriptor request with id 1-2
2023-03-03 11:12:12,321 INFO org.apache.flink.runtime.taskmanager.Task
[] - [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1}
(1/1)#0 (7b7e232cdc48cf8b67549d57b22af77d) switched from RUNNING to FINISHED.
2023-03-03 11:12:12,322 INFO org.apache.flink.runtime.taskmanager.Task
[] - Freeing task resources for [3]{Create, Map(<lambda at
pipeline.py:51>), DummyDoFn1} (1/1)#0 (7b7e232cdc48cf8b67549d57b22af77d).
2023-03-03 11:12:12,323 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
Un-registering task and sending final execution state FINISHED to JobManager
for task [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1} (1/1)#0
7b7e232cdc48cf8b67549d57b22af77d.
2023-03-03 11:12:12,338 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 0)
2023-03-03 11:12:12,338 INFO pipeline.py:54
[] - Final output(1, 0)
2023-03-03 11:12:12,338 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 1)
2023-03-03 11:12:12,338 INFO pipeline.py:54
[] - Final output(1, 1)
2023-03-03 11:12:12,338 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 2)
2023-03-03 11:12:12,338 INFO pipeline.py:54
[] - Final output(1, 2)
2023-03-03 11:12:12,339 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 3)
2023-03-03 11:12:12,339 INFO pipeline.py:54
[] - Final output(1, 3)
2023-03-03 11:12:12,339 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 4)
2023-03-03 11:12:12,339 INFO pipeline.py:54
[] - Final output(1, 4)
2023-03-03 11:12:12,339 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 5)
2023-03-03 11:12:12,339 INFO pipeline.py:54
[] - Final output(1, 5)
2023-03-03 11:12:12,339 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 6)
2023-03-03 11:12:12,339 INFO pipeline.py:54
[] - Final output(1, 6)
2023-03-03 11:12:12,339 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 7)
2023-03-03 11:12:12,339 INFO pipeline.py:54
[] - Final output(1, 7)
2023-03-03 11:12:12,339 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 8)
2023-03-03 11:12:12,340 INFO pipeline.py:54
[] - Final output(1, 8)
2023-03-03 11:12:12,340 INFO pipeline.py:26
[] - Input to DummyReadModifyWriteStateDoFn: (1, 9)
2023-03-03 11:12:12,340 INFO pipeline.py:54
[] - Final output(1, 9)
```
On the other hand, if I run it on DirectRunner (`flink = False`), I get:
```
INFO:root:Input to DummyDoFn: (1, 0)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 0)
INFO:root:Final output(1, 0)
INFO:root:Input to DummyDoFn: (1, 1)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 1)
INFO:root:Final output(1, 1)
INFO:root:Input to DummyDoFn: (1, 2)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 2)
INFO:root:Final output(1, 2)
INFO:root:Input to DummyDoFn: (1, 3)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 3)
INFO:root:Final output(1, 3)
INFO:root:Input to DummyDoFn: (1, 4)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 4)
INFO:root:Final output(1, 4)
INFO:root:Input to DummyDoFn: (1, 5)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 5)
INFO:root:Final output(1, 5)
INFO:root:Input to DummyDoFn: (1, 6)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 6)
INFO:root:Final output(1, 6)
INFO:root:Input to DummyDoFn: (1, 7)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 7)
INFO:root:Final output(1, 7)
INFO:root:Input to DummyDoFn: (1, 8)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 8)
INFO:root:Final output(1, 8)
INFO:root:Input to DummyDoFn: (1, 9)
INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 9)
INFO:root:Final output(1, 9)
```
The output of the two runners is different: in the first case all data is
processed through the first step, and then forwarded to the second step, while
in the second case each datapoint is processed through both steps before moving
to the next one. I think the expected output should be the second one. This
issue can be blocking in streaming cases, where data is unbounded and for this
reason the first step "never ends" and data is not processed by the second step.
I also noticed that this issue affects both ReadModifyWriteStateSpec and
BagStateSpec, but not CombiningValueStateSpec. I also noticed that the issue
disappears when I remove the `values` variable in the `process` function of the
Stateful DoFn class.
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]