tvalentyn commented on issue #27156:
URL: https://github.com/apache/beam/issues/27156#issuecomment-1848374020
The behavior you are describing is surprising. A failure in `finish_bundle`
results in bundle retry, but that should not result in elements written twice
in the IO step. I do not reproduce the behavior described here. I tried running
the following pipeline on Dataflow:
```
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.testing.synthetic_pipeline import SyntheticSource
import logging
_LOGGER = logging.getLogger(__name__)
tf_out_path = 'gs://clouddfe-valentyn/finish_repro_10m'
def tf_record_finish_bundle(p):
class SometimesFailFn(beam.DoFn):
def start_bundle(self):
global bundle_counter
_LOGGER.warning("Start processing bundle")
def finish_bundle(self):
from random import random
if random() < 0.25:
raise RuntimeError(f"Intentionally failing finish_bundle.")
_LOGGER.warning("Finished processing bundle")
def process(self, element):
yield element[1]
(
p
| beam.io.Read(
SyntheticSource({
"numRecords": 10000000, "keySizeBytes": 1,
"valueSizeBytes": 1
}))
| beam.Reshuffle()
| beam.ParDo(SometimesFailFn())
| beam.io.tfrecordio.WriteToTFRecord(tf_out_path)
)
def verify_tfrecord_output(p):
(p
| beam.io.tfrecordio.ReadFromTFRecord(tf_out_path+"*")
| beam.combiners.Count.Globally()
| "LogCount" >> beam.LogElements(prefix="TotalCount ",
level=logging.WARNING)
)
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
print(known_args)
print(pipeline_args)
with beam.Pipeline(argv=pipeline_args) as p:
tf_record_finish_bundle(p)
with beam.Pipeline(argv=pipeline_args) as p:
verify_tfrecord_output(p)
```
I verified that there was at least 1 "Intentionally failing finish_bundle",
but the total number of elements written in TFRecordIO was as expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]