tvalentyn commented on issue #27156:
URL: https://github.com/apache/beam/issues/27156#issuecomment-1848374020

   The behavior you are describing is surprising. A failure in `finish_bundle` 
results in bundle retry, but that should not result in elements written twice 
in the IO step. I do not reproduce the behavior described here. I tried running 
the following pipeline on Dataflow:
   
   ```
   import argparse
   import logging
   
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.io.gcp.pubsub import ReadFromPubSub
   from apache_beam.testing.synthetic_pipeline import SyntheticSource
   
   import logging
   _LOGGER = logging.getLogger(__name__)
   tf_out_path = 'gs://clouddfe-valentyn/finish_repro_10m'
   
   def tf_record_finish_bundle(p):
     class SometimesFailFn(beam.DoFn):
       def start_bundle(self):
         global bundle_counter
         _LOGGER.warning("Start processing bundle")
   
       def finish_bundle(self):
         from random import random
         if random() < 0.25:
           raise RuntimeError(f"Intentionally failing finish_bundle.")
   
         _LOGGER.warning("Finished processing bundle")
   
       def process(self, element):
         yield element[1]
   
     (
       p
       | beam.io.Read(
                 SyntheticSource({
                     "numRecords": 10000000, "keySizeBytes": 1, 
"valueSizeBytes": 1
                 }))
   
      | beam.Reshuffle()
      | beam.ParDo(SometimesFailFn())
      | beam.io.tfrecordio.WriteToTFRecord(tf_out_path)
     )
   
   
   def verify_tfrecord_output(p):
     (p
      | beam.io.tfrecordio.ReadFromTFRecord(tf_out_path+"*")
      | beam.combiners.Count.Globally()
      | "LogCount" >> beam.LogElements(prefix="TotalCount ", 
level=logging.WARNING)
     )
   
   def run(argv=None):
     parser = argparse.ArgumentParser()
     known_args, pipeline_args = parser.parse_known_args(argv)
     print(known_args)
     print(pipeline_args)
   
     with beam.Pipeline(argv=pipeline_args) as p:
       tf_record_finish_bundle(p)
   
     with beam.Pipeline(argv=pipeline_args) as p:
       verify_tfrecord_output(p)
   
   ```
   
   I verified that there was at least 1 "Intentionally failing finish_bundle", 
but the total number of elements written in TFRecordIO was as expected.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to