+1 Specifically, p.run().waitUntilFinish() would throw an exception if there were errors during pipeline execution.
On Wed, Nov 8, 2023 at 8:05 AM John Casey via dev <dev@beam.apache.org> wrote: > Yep, thats a common misunderstanding with beam. > > The code that is actually executed in the try block is just for pipeline > construction, and no data is processed at this point in time. > > Once the pipeline is constructed, the various pardos are serialized, and > sent to the runners, where they are actually executed. > > In this case, if there was an exception in the pardo that converts rows to > avro, you would see the "Exception when converting Beam Row to Avro Record" > log in whatever logs your runner provides you, and the exception would > propagate up to your runner. > > In this case, your log log.info("Finished writing Parquet file to path > {}", writePath); is inaccurate, it will log when the pipeline is > constructed, not when the parquet write completes > > On Wed, Nov 8, 2023 at 10:51 AM Ramya Prasad via dev <dev@beam.apache.org> > wrote: > >> Hey John, >> >> Yes that's how my code is set up, I have the FileIO.write() in its own >> try-catch block. I took a second look at where exactly the code is failing, >> and it's actually in a ParDo function which is happening before I call >> FileIO.write(). But even within that, I've tried adding a try-catch but the >> error isn't stopping the actual application run in a Spark cluster. In the >> cluster, I see that the exception is being thrown from my ParDo, but then >> immediately after that, I see the line* INFO ApplicationMaster: Final >> app status: SUCCEEDED, exitCode: 0. *This is roughly what my code setup >> looks like: >> >> @Slf4j >> public class ParquetWriteActionStrategy { >> >> public void executeWriteAction(Pipeline p) throws Exception { >> >> try { >> >> // transform PCollection from type Row to GenericRecords >> PCollection<GenericRecord> records = p.apply("transform >> PCollection from type Row to GenericRecords", >> ParDo.of(new DoFn<Row, GenericRecord>() { >> @ProcessElement >> public void processElement(@Element Row row, >> OutputReceiver<GenericRecord> out) { >> try { >> <convert Row to Avro Record> >> } catch (Exception e) { >> log.error("Exception when converting Beam >> Row to Avro Record: {}", e.getMessage()); >> throw e; >> } >> >> } >> })).setCoder(AvroCoder.of(avroSchema)); >> records.apply("Writing Parquet Output File", >> FileIO.<GenericRecord> >> write() >> .via(<schema>) >> .to(writePath) >> .withSuffix(".parquet")); >> >> log.info("Finished writing Parquet file to path {}", writePath); >> } catch (Exception e) { >> log.error("Error in Parquet Write Action. {}", e.getMessage()); >> throw e; >> } >> >> } >> >> >> On Wed, Nov 8, 2023 at 9:16 AM John Casey via dev <dev@beam.apache.org> >> wrote: >> >>> There are 2 execution times when using Beam. The first execution is >>> local, when a pipeline is constructed, and the second is remote on the >>> runner, processing data. >>> >>> Based on what you said, it sounds like you are wrapping pipeline >>> construction in a try-catch, and constructing FileIO isn't failing. >>> >>> e.g. >>> >>> try { >>> >>> FileIO.write().someOtherconfigs() >>> >>> } catch ... >>> >>> this will catch any exceptions in constructing fileio, but the running >>> pipeline won't propagate exceptions through this exception block. >>> >>> On Tue, Nov 7, 2023 at 5:21 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> >>>> File write failures should be throwing exceptions that will >>>> terminate the pipeline on failure. (Generally a distributed runner will >>>> make multiple attempts before abandoning the entire pipeline of course.) >>>> >>>> Are you seeing files failing to be written but no exceptions being >>>> thrown? If so, this is definitely a bug that we want to resolve. >>>> >>>> >>>> On Tue, Nov 7, 2023 at 11:17 AM Ramya Prasad via dev < >>>> dev@beam.apache.org> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am a developer using Apache Beam in my Java application, and I need >>>>> some help on how to handle exceptions when writing a file to S3. I have >>>>> tried wrapping my code within a try-catch block, but no exception is being >>>>> thrown within the try block. I'm assuming that FileIO doesn't throw any >>>>> exceptions upon failure. Is there a way in which I can either terminate >>>>> the >>>>> program on failure or at least be made aware of if any of my write >>>>> operations fail? >>>>> >>>>> Thanks and sincerely, >>>>> Ramya >>>>> ------------------------------ >>>>> >>>>> The information contained in this e-mail may be confidential and/or >>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>> solely in performance of work or services for Capital One. The information >>>>> transmitted herewith is intended only for use by the individual or entity >>>>> to which it is addressed. If the reader of this message is not the >>>>> intended >>>>> recipient, you are hereby notified that any review, retransmission, >>>>> dissemination, distribution, copying or other use of, or taking of any >>>>> action in reliance upon this information is strictly prohibited. If you >>>>> have received this communication in error, please contact the sender and >>>>> delete the material from your computer. >>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------ >> >> The information contained in this e-mail may be confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the intended >> recipient, you are hereby notified that any review, retransmission, >> dissemination, distribution, copying or other use of, or taking of any >> action in reliance upon this information is strictly prohibited. If you >> have received this communication in error, please contact the sender and >> delete the material from your computer. >> >> >> >> >>