nickuncaged1201 commented on code in PR #27039:
URL: https://github.com/apache/beam/pull/27039#discussion_r1231376067
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
+ private SerializableFunction<Row, OutputT> mapFn;
+ private Counter errorCounter;
+ private TupleTag<OutputT> outputTag;
+ private long errorsInBundle = 0L;
+
+ public ErrorCounterFn(
+ String name, SerializableFunction<Row, OutputT> mapFn,
TupleTag<OutputT> outputTag) {
+ errorCounter =
Metrics.counter(FileWriteSchemaTransformFormatProvider.class, name);
+ this.mapFn = mapFn;
+ this.outputTag = outputTag;
+ }
+
+ @ProcessElement
+ public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
+ try {
+ receiver.get(outputTag).output(mapFn.apply(row));
+ } catch (Exception e) {
+ errorsInBundle += 1;
+ LOG.warn("Error while parsing input element", e);
Review Comment:
For the case of file write transforms, all of the mapper functions are some
sort of conversion parser from Beam row, be it converting to Avro generic
records or Json etc. So I think for the scope this should be valid
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]