johnjcasey commented on code in PR #27039:
URL: https://github.com/apache/beam/pull/27039#discussion_r1223092440
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java:
##########
@@ -83,9 +85,11 @@ public PCollection<String> expand(PCollection<Row> input) {
}
WriteFilesResult<String> result = input.apply("Row to CSV", write);
- return result
- .getPerDestinationOutputFilenames()
- .apply("perDestinationOutputFilenames", Values.create());
+ PCollection<String> output =
+ result
+ .getPerDestinationOutputFilenames()
+ .apply("perDestinationOutputFilenames", Values.create());
+ return PCollectionTuple.of(RESULT_TAG, output);
Review Comment:
should this not also contain an error tag?
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
+ private SerializableFunction<Row, OutputT> mapFn;
+ private Counter errorCounter;
+ private TupleTag<OutputT> outputTag;
+ private long errorsInBundle = 0L;
+
+ public ErrorCounterFn(
+ String name, SerializableFunction<Row, OutputT> mapFn,
TupleTag<OutputT> outputTag) {
+ errorCounter =
Metrics.counter(FileWriteSchemaTransformFormatProvider.class, name);
+ this.mapFn = mapFn;
+ this.outputTag = outputTag;
+ }
+
+ @ProcessElement
+ public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
+ try {
+ receiver.get(outputTag).output(mapFn.apply(row));
+ } catch (Exception e) {
+ errorsInBundle += 1;
+ LOG.warn("Error while parsing input element", e);
Review Comment:
is this error always going to be valid? Id expect some mapper functions to
not be parsers
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java:
##########
@@ -47,22 +57,27 @@ public String identifier() {
* {@link PCollection} file names written using {@link AvroIO.Write}.
*/
@Override
- public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ public PTransform<PCollection<Row>, PCollectionTuple> buildTransform(
FileWriteSchemaTransformConfiguration configuration, Schema schema) {
- return new PTransform<PCollection<Row>, PCollection<String>>() {
+ return new PTransform<PCollection<Row>, PCollectionTuple>() {
@Override
- public PCollection<String> expand(PCollection<Row> input) {
+ public PCollectionTuple expand(PCollection<Row> input) {
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
- PCollection<GenericRecord> avro =
- input
- .apply(
- "Row To Avro Generic Record",
-
FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
- .setCoder(coder);
+ PCollectionTuple tuple =
+ input.apply(
+ "Row To Avro Generic Record",
+ ParDo.of(
+ new ErrorCounterFn<GenericRecord>(
+ "Avro-write-error-counter",
+
AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(schema)),
+ ERROR_FN_OUPUT_TAG))
+ .withOutputTags(ERROR_FN_OUPUT_TAG,
TupleTagList.of(ERROR_TAG)));
Review Comment:
I'm a bit confused by this. How is it counting errors upstream of where you
are writing to avro?
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
Review Comment:
Also, we need to write tests for this class.
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
Review Comment:
this name doesn't seem right. This transform is a .map() that aggregates
errors on a separate tag
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]