amontoli opened a new issue, #25709:
URL: https://github.com/apache/beam/issues/25709

   ### What happened?
   
   I tried to run a pipeline on a Flink cluster, using the PortableRunner. The 
pipeline contains a dummy DoFn, which simply prints the element it has to 
process, and a dummy Stateful DoFn, similar to the non-stateful one, but with a 
ReadModifyWriteStateSpec variable defined. I am using Beam 2.45.0. Here is the 
sample code:
   ```
   import logging
   from typing import Tuple
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
   
   logging.basicConfig(level=logging.INFO)
   
   class DummyDoFn(beam.DoFn):
       def process(self, element):
           logging.info(f"Input to DummyDoFn: " + str(element))
           yield element
   
   class DummyReadModifyWriteStateDoFn(beam.DoFn):
       VALUES = 
beam.transforms.userstate.ReadModifyWriteStateSpec(name="values",
                                                                   
coder=beam.coders.PickleCoder())
   
       def process(self, element: Tuple[int, int], values = 
beam.DoFn.StateParam(VALUES)):
           logging.info(f"Input to DummyReadModifyWriteStateDoFn: " + 
str(element))
           yield element
   
   def run():
       common_options = ['--job_name=Test stateful DoFn', '--streaming']
   
       flink = True
       if flink:
           runner_options =  [
                   '--runner=PortableRunner',
                   '--job_endpoint=localhost:8099',
                   '--artifact_endpoint=localhost:8098',
                   '--environment_type=EXTERNAL',
                   '--environment_config=localhost:50000',
                   ]
       else:
           runner_options = ["--runner=DirectRunner"]
       options = common_options + runner_options
       beam_options = PipelineOptions(flags = options)
   
       with beam.Pipeline(options = beam_options) as pipeline:
           (pipeline | beam.Create([i for i in range(10)])
                     | beam.Map(lambda x: (1,x))
                     | "DummyDoFn1" >> beam.ParDo(DummyDoFn())
                     | "DummyDoFn2" >> 
beam.ParDo(DummyReadModifyWriteStateDoFn())
                     | beam.Map(lambda x: logging.info("Final output" + 
str(x))))
   
   if __name__ == "__main__":
       run()
   ```
   
   If I run this pipeline on the Flink cluster (`flink = True`), the output is:
   ```
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 0)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 1)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 2)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 3)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 4)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 5)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 6)
   2023-03-03 11:12:12,315 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 7)
   2023-03-03 11:12:12,315 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 8)
   2023-03-03 11:12:12,315 INFO  pipeline.py:18                                 
              [] - Input to DummyDoFn: (1, 9)
   2023-03-03 11:12:12,320 INFO  
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - 
getProcessBundleDescriptor request with id 1-2
   2023-03-03 11:12:12,321 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1} 
(1/1)#0 (7b7e232cdc48cf8b67549d57b22af77d) switched from RUNNING to FINISHED.
   2023-03-03 11:12:12,322 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Freeing task resources for [3]{Create, Map(<lambda at 
pipeline.py:51>), DummyDoFn1} (1/1)#0 (7b7e232cdc48cf8b67549d57b22af77d).
   2023-03-03 11:12:12,323 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1} (1/1)#0 
7b7e232cdc48cf8b67549d57b22af77d.
   2023-03-03 11:12:12,338 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 0)
   2023-03-03 11:12:12,338 INFO  pipeline.py:54                                 
              [] - Final output(1, 0)
   2023-03-03 11:12:12,338 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 1)
   2023-03-03 11:12:12,338 INFO  pipeline.py:54                                 
              [] - Final output(1, 1)
   2023-03-03 11:12:12,338 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 2)
   2023-03-03 11:12:12,338 INFO  pipeline.py:54                                 
              [] - Final output(1, 2)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 3)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                 
              [] - Final output(1, 3)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 4)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                 
              [] - Final output(1, 4)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 5)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                 
              [] - Final output(1, 5)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 6)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                 
              [] - Final output(1, 6)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 7)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                 
              [] - Final output(1, 7)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 8)
   2023-03-03 11:12:12,340 INFO  pipeline.py:54                                 
              [] - Final output(1, 8)
   2023-03-03 11:12:12,340 INFO  pipeline.py:26                                 
              [] - Input to DummyReadModifyWriteStateDoFn: (1, 9)
   2023-03-03 11:12:12,340 INFO  pipeline.py:54                                 
              [] - Final output(1, 9)
   ```
   
   On the other hand, if I run it on DirectRunner (`flink = False`), I get:
   ```
   INFO:root:Input to DummyDoFn: (1, 0)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 0)
   INFO:root:Final output(1, 0)
   INFO:root:Input to DummyDoFn: (1, 1)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 1)
   INFO:root:Final output(1, 1)
   INFO:root:Input to DummyDoFn: (1, 2)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 2)
   INFO:root:Final output(1, 2)
   INFO:root:Input to DummyDoFn: (1, 3)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 3)
   INFO:root:Final output(1, 3)
   INFO:root:Input to DummyDoFn: (1, 4)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 4)
   INFO:root:Final output(1, 4)
   INFO:root:Input to DummyDoFn: (1, 5)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 5)
   INFO:root:Final output(1, 5)
   INFO:root:Input to DummyDoFn: (1, 6)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 6)
   INFO:root:Final output(1, 6)
   INFO:root:Input to DummyDoFn: (1, 7)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 7)
   INFO:root:Final output(1, 7)
   INFO:root:Input to DummyDoFn: (1, 8)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 8)
   INFO:root:Final output(1, 8)
   INFO:root:Input to DummyDoFn: (1, 9)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 9)
   INFO:root:Final output(1, 9)
   ```
   The output of the two runners is different: in the first case all data is 
processed through the first step, and then forwarded to the second step, while 
in the second case each datapoint is processed through both steps before moving 
to the next one. I think the expected output should be the second one. This 
issue can be blocking in streaming cases, where data is unbounded and for this 
reason the first step "never ends" and data is not processed by the second step.
   
   I also noticed that this issue affects both ReadModifyWriteStateSpec and 
BagStateSpec, but not CombiningValueStateSpec. I also noticed that the issue 
disappears when I remove the `values` variable in the `process` function of the 
Stateful DoFn class.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to