Polber commented on code in PR #28190: URL: https://github.com/apache/beam/pull/28190#discussion_r1319088438
########## sdks/python/apache_beam/yaml/yaml_errors.md: ########## @@ -0,0 +1,196 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Beam YAML Error Handling + +The larger one's pipeline gets, the more common it is to encounter "exceptional" +data that is malformatted, doesn't handle the proper preconditions, or otherwise +breaks during processing. Genrally any such record will cause the pipeline to +permanently fail, but often it is desirable to allow the pipeline to continue, +re-directing bad records to another path for special handling or simply +recording them for later off-line analysis. This is often called the +"dead letter queue" pattern. + +Beam YAML has special support for this pattern if the transform supports a +`error_handling` config parameter with an `output` field. For example, +the following code will write all "good" processed records to one file and +any "bad" records to a separate file. + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + input: ReadFromCsv + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: my_error_output + + - type: WriteToJson + input: MapToFields + config: + path: /path/to/output.json + + - type: WriteToJson + name: WriteErrorsToJson + input: MapToFields.my_error_output + config: + path: /path/to/errors.json +``` + +Note that with `error_handling` declared, `MapToFields.my_error_output` +**must** be consumed; to ignore it will be an error. Any use is fine, e.g. +logging the bad records to stdout would be sufficient (though not recommended +for a robust pipeline). + +Some transforms allow for extra arguments in their error_handling config, e.g. +for Python functions one can give a `threshold` which limits the relative number +of records that can be bad before considering the entire pipeline a failure + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + input: ReadFromCsv + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: my_error_output + # If more than 10% of records throw an error, stop the pipeline. + threshold: 0.1 + + - type: WriteToJson + input: MapToFields + config: + path: /path/to/output.json + + - type: WriteToJson + name: WriteErrorsToJson + input: MapToFields.my_error_output + config: + path: /path/to/errors.json +``` + +One can do arbitrary further processing on these failed records if desired, +e.g. + +``` +pipeline: + transforms: + - type: ReadFromCsv + config: + path: /path/to/input*.csv + + - type: MapToFields + name: ComputeRatio + input: ReadFromCsv + config: + language: python + fields: + col1: col1 + # This could raise a divide-by-zero error. + ratio: col2 / col3 + error_handling: + output: my_error_output + + - type: MapToFields + name: ComputeRatioForBadRecords + input: ComputeRatio.my_error_output + config: + language: python + fields: + col1: col1 + ratio: col2 / (col3 + 1) + error_handling: + output: still_bad + + - type: WriteToJson + # Takes as input everything form the "success" path of both transforms. Review Comment: ```suggestion # Takes as input everything from the "success" path of both transforms. ``` ########## sdks/python/apache_beam/yaml/yaml_errors.md: ########## @@ -0,0 +1,196 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Beam YAML Error Handling + +The larger one's pipeline gets, the more common it is to encounter "exceptional" +data that is malformatted, doesn't handle the proper preconditions, or otherwise +breaks during processing. Genrally any such record will cause the pipeline to Review Comment: ```suggestion breaks during processing. Generally any such record will cause the pipeline to ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
