This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c624e02b852 [CsvIO] update error and result handling. (#32023)
c624e02b852 is described below
commit c624e02b852efc8511c940977173c756f4653ed6
Author: Francis O'Hara <[email protected]>
AuthorDate: Tue Jul 30 17:36:29 2024 +0000
[CsvIO] update error and result handling. (#32023)
Co-authored-by: Lahari Guduru
<[email protected]>
---
.../beam/sdk/io/csv/CsvIOParseConfiguration.java | 25 -----
.../apache/beam/sdk/io/csv/CsvIOParseError.java | 24 +++++
.../org/apache/beam/sdk/io/csv/CsvIOParseKV.java | 15 ++-
.../apache/beam/sdk/io/csv/CsvIOParseResult.java | 100 +++++++++++++++++++
.../beam/sdk/io/csv/CsvIORecordToObjects.java | 51 +++++++---
.../beam/sdk/io/csv/CsvIOStringToCsvRecord.java | 45 +++++++--
.../beam/sdk/io/csv/CsvIORecordToObjectsTest.java | 54 +++++++---
.../sdk/io/csv/CsvIOStringToCsvRecordTest.java | 109 ++++++++++++++++-----
8 files changed, 332 insertions(+), 91 deletions(-)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
index 87e0128d73e..dd9ef5b3486 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
@@ -24,12 +24,7 @@ import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;
@@ -37,10 +32,6 @@ import org.apache.commons.csv.CSVFormat;
@AutoValue
abstract class CsvIOParseConfiguration<T> implements Serializable {
- /** A Dead Letter Queue that returns potential errors with {@link
BadRecord}. */
- final PTransform<PCollection<BadRecord>, PCollection<BadRecord>>
errorHandlerTransform =
- new BadRecordOutput();
-
static <T> Builder<T> builder() {
return new AutoValue_CsvIOParseConfiguration.Builder<>();
}
@@ -84,20 +75,4 @@ abstract class CsvIOParseConfiguration<T> implements
Serializable {
return autoBuild();
}
}
-
- private static class BadRecordOutput
- extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {
-
- @Override
- public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
- return input.apply(ParDo.of(new BadRecordTransformFn()));
- }
-
- private static class BadRecordTransformFn extends DoFn<BadRecord,
BadRecord> {
- @ProcessElement
- public void process(@Element BadRecord input, OutputReceiver<BadRecord>
receiver) {
- receiver.output(input);
- }
- }
- }
}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
index ad7d05912fa..7a2be9786d7 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
@@ -17,9 +17,18 @@
*/
package org.apache.beam.sdk.io.csv;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
@@ -36,6 +45,21 @@ public abstract class CsvIOParseError {
return new AutoValue_CsvIOParseError.Builder();
}
+ private static final SchemaProvider SCHEMA_PROVIDER = new AutoValueSchema();
+
+ private static final TypeDescriptor<CsvIOParseError> TYPE =
+ TypeDescriptor.of(CsvIOParseError.class);
+
+ private static final Schema SCHEMA =
checkStateNotNull(SCHEMA_PROVIDER.schemaFor(TYPE));
+
+ private static final SerializableFunction<CsvIOParseError, Row> TO_ROW_FN =
+ checkStateNotNull(SCHEMA_PROVIDER.toRowFunction(TYPE));
+
+ private static final SerializableFunction<Row, CsvIOParseError> FROM_ROW_FN =
+ checkStateNotNull(SCHEMA_PROVIDER.fromRowFunction(TYPE));
+
+ static final Coder<CsvIOParseError> CODER = SchemaCoder.of(SCHEMA, TYPE,
TO_ROW_FN, FROM_ROW_FN);
+
/** The caught {@link Exception#getMessage()}. */
public abstract String getMessage();
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
index 1b8e43314b1..6ddafdccd9f 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
@@ -17,9 +17,8 @@
*/
package org.apache.beam.sdk.io.csv;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVRecord;
@@ -31,11 +30,17 @@ import org.apache.commons.csv.CSVRecord;
// TODO(https://github.com/apache/beam/issues/31873): implement class after
all dependencies are
// completed.
class CsvIOParseKV<T>
- extends PTransform<PCollection<KV<String, Iterable<String>>>,
PCollection<T>> {
+ extends PTransform<PCollection<KV<String, Iterable<String>>>,
CsvIOParseResult<T>> {
+
+ private final Coder<T> outputCoder;
+
+ private CsvIOParseKV(Coder<T> outputCoder) {
+ this.outputCoder = outputCoder;
+ }
// TODO(https://github.com/apache/beam/issues/31873): implement method.
@Override
- public PCollection<T> expand(PCollection<KV<String, Iterable<String>>>
input) {
- return input.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, T>()
{}));
+ public CsvIOParseResult<T> expand(PCollection<KV<String, Iterable<String>>>
input) {
+ return CsvIOParseResult.empty(input.getPipeline(), outputCoder);
}
}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
new file mode 100644
index 00000000000..77264fccd2c
--- /dev/null
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The {@link T} and {@link CsvIOParseError} {@link PCollection} results of
parsing CSV records. Use
+ * {@link #getOutput()} and {@link #getErrors()} to apply these results in a
pipeline.
+ */
+public class CsvIOParseResult<T> implements POutput {
+
+ static <T> CsvIOParseResult<T> of(
+ TupleTag<T> outputTag,
+ Coder<T> outputCoder,
+ TupleTag<CsvIOParseError> errorTag,
+ PCollectionTuple pct) {
+ return new CsvIOParseResult<>(outputTag, outputCoder, errorTag, pct);
+ }
+
+ static <T> CsvIOParseResult<T> empty(Pipeline pipeline, Coder<T>
outputCoder) {
+ return new CsvIOParseResult<>(
+ new TupleTag<T>() {},
+ outputCoder,
+ new TupleTag<CsvIOParseError>() {},
+ PCollectionTuple.empty(pipeline));
+ }
+
+ private final Pipeline pipeline;
+ private final TupleTag<T> outputTag;
+ private final PCollection<T> output;
+ private final TupleTag<CsvIOParseError> errorTag;
+ private final PCollection<CsvIOParseError> errors;
+
+ private CsvIOParseResult(
+ TupleTag<T> outputTag,
+ Coder<T> outputCoder,
+ TupleTag<CsvIOParseError> errorTag,
+ PCollectionTuple pct) {
+ this.outputTag = outputTag;
+ this.errorTag = errorTag;
+ this.pipeline = pct.getPipeline();
+ this.output = pct.get(outputTag).setCoder(outputCoder);
+ this.errors = pct.get(errorTag).setCoder(CsvIOParseError.CODER);
+ }
+
+ /** The {@link T} {@link PCollection} as a result of successfully parsing
CSV records. */
+ public PCollection<T> getOutput() {
+ return output;
+ }
+
+ /**
+ * The {@link CsvIOParseError} {@link PCollection} as a result of errors
associated with parsing
+ * CSV records.
+ */
+ public PCollection<CsvIOParseError> getErrors() {
+ return errors;
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return ImmutableMap.of(
+ outputTag, output,
+ errorTag, errors);
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
+}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
index 4340b68f3c4..97bceb47934 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.csv;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
@@ -27,13 +28,18 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
+import org.joda.time.Instant;
/**
* {@link CsvIORecordToObjects} is a class that takes an input of {@link
PCollection<List<String>>}
* and outputs custom type {@link PCollection<T>}.
*/
-class CsvIORecordToObjects<T> extends PTransform<PCollection<List<String>>,
PCollection<T>> {
+class CsvIORecordToObjects<T> extends PTransform<PCollection<List<String>>,
CsvIOParseResult<T>> {
/** The expected {@link Schema} of the target type. */
private final Schema schema;
@@ -44,6 +50,10 @@ class CsvIORecordToObjects<T> extends
PTransform<PCollection<List<String>>, PCol
/** A {@link Map} of {@link Schema.Field}s to their expected positions
within the CSV record. */
private final Map<Integer, Schema.Field> indexToFieldMap;
+ private final TupleTag<T> outputTag = new TupleTag<T>() {};
+
+ private final TupleTag<CsvIOParseError> errorTag = new
TupleTag<CsvIOParseError>() {};
+
/**
* A {@link SerializableFunction} that converts from {@link Row} to {@link
Schema} mapped custom
* type.
@@ -63,23 +73,40 @@ class CsvIORecordToObjects<T> extends
PTransform<PCollection<List<String>>, PCol
}
@Override
- public PCollection<T> expand(PCollection<List<String>> input) {
- return input.apply(ParDo.of(new RecordToObjectsFn())).setCoder(coder);
+ public CsvIOParseResult<T> expand(PCollection<List<String>> input) {
+ PCollectionTuple pct =
+ input.apply(
+ RecordToObjectsFn.class.getSimpleName(),
+ ParDo.of(new RecordToObjectsFn()).withOutputTags(outputTag,
TupleTagList.of(errorTag)));
+
+ return CsvIOParseResult.of(outputTag, coder, errorTag, pct);
}
private class RecordToObjectsFn extends DoFn<List<String>, T> {
@ProcessElement
- public void process(@Element List<String> record, OutputReceiver<T>
receiver) {
+ public void process(@Element List<String> record, MultiOutputReceiver
receiver) {
Map<String, Object> fieldNamesToValues = new HashMap<>();
- for (Map.Entry<Integer, Schema.Field> entry :
indexToFieldMap.entrySet()) {
- Schema.Field field = entry.getValue();
- int index = entry.getKey();
- String cell = record.get(index);
- Object value = parseCell(cell, field);
- fieldNamesToValues.put(field.getName(), value);
+ try {
+ for (Map.Entry<Integer, Schema.Field> entry :
indexToFieldMap.entrySet()) {
+ Schema.Field field = entry.getValue();
+ int index = entry.getKey();
+ String cell = record.get(index);
+ Object value = parseCell(cell, field);
+ fieldNamesToValues.put(field.getName(), value);
+ }
+ Row row =
Row.withSchema(schema).withFieldValues(fieldNamesToValues).build();
+ receiver.get(outputTag).output(fromRowFn.apply(row));
+ } catch (RuntimeException e) {
+ receiver
+ .get(errorTag)
+ .output(
+ CsvIOParseError.builder()
+ .setCsvRecord(record.toString())
+ .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
+ .setStackTrace(Throwables.getStackTraceAsString(e))
+ .setObservedTimestamp(Instant.now())
+ .build());
}
- Row row =
Row.withSchema(schema).withFieldValues(fieldNamesToValues).build();
- receiver.output(fromRowFn.apply(row));
}
}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
index b5ce6a0fec2..5fc4954cb45 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.csv;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -27,9 +28,14 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
+import org.joda.time.Instant;
/**
* {@link CsvIOStringToCsvRecord} is a class that takes a {@link
PCollection<String>} input and
@@ -37,9 +43,14 @@ import org.apache.commons.csv.CSVRecord;
* targeted error detection.
*/
final class CsvIOStringToCsvRecord
- extends PTransform<PCollection<String>, PCollection<List<String>>> {
+ extends PTransform<PCollection<String>, CsvIOParseResult<List<String>>> {
+
private final CSVFormat csvFormat;
+ private final TupleTag<List<String>> outputTag = new
TupleTag<List<String>>() {};
+
+ private final TupleTag<CsvIOParseError> errorTag = new
TupleTag<CsvIOParseError>() {};
+
CsvIOStringToCsvRecord(CSVFormat csvFormat) {
this.csvFormat = csvFormat;
}
@@ -49,10 +60,15 @@ final class CsvIOStringToCsvRecord
* to Row or custom type.
*/
@Override
- public PCollection<List<String>> expand(PCollection<String> input) {
- return input
- .apply(ParDo.of(new ProcessLineToRecordFn()))
- .setCoder(ListCoder.of(NullableCoder.of(StringUtf8Coder.of())));
+ public CsvIOParseResult<List<String>> expand(PCollection<String> input) {
+ PCollectionTuple pct =
+ input.apply(
+ ProcessLineToRecordFn.class.getSimpleName(),
+ ParDo.of(new ProcessLineToRecordFn())
+ .withOutputTags(outputTag, TupleTagList.of(errorTag)));
+
+ return CsvIOParseResult.of(
+ outputTag, ListCoder.of(NullableCoder.of(StringUtf8Coder.of())),
errorTag, pct);
}
/** Processes each line in order to convert it to a {@link CSVRecord}. */
@@ -60,13 +76,24 @@ final class CsvIOStringToCsvRecord
private final String headerLine = headerLine(csvFormat);
@ProcessElement
- public void process(@Element String line, OutputReceiver<List<String>>
receiver)
- throws IOException {
+ public void process(@Element String line, MultiOutputReceiver receiver) {
if (headerLine.equals(line)) {
return;
}
- for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
- receiver.output(csvRecordtoList(record));
+ try (CSVParser csvParser = CSVParser.parse(line, csvFormat)) {
+ for (CSVRecord record : csvParser.getRecords()) {
+ receiver.get(outputTag).output(csvRecordtoList(record));
+ }
+ } catch (IOException e) {
+ receiver
+ .get(errorTag)
+ .output(
+ CsvIOParseError.builder()
+ .setCsvRecord(line)
+ .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
+ .setObservedTimestamp(Instant.now())
+ .setStackTrace(Throwables.getStackTraceAsString(e))
+ .build());
}
}
}
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
index eb8cacdec5a..9ccb5d0c7bc 100644
---
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
@@ -35,7 +35,6 @@ import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrim
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
-import static org.junit.Assert.assertThrows;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -55,6 +54,7 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -135,7 +135,9 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
ROW_ROW_SERIALIZABLE_FUNCTION,
ALL_PRIMITIVE_DATA_TYPES_ROW_CODER);
- PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+ CsvIOParseResult<Row> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -152,7 +154,9 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
allPrimitiveDataTypesFromRowFn(),
ALL_PRIMITIVE_DATA_TYPES_CODER);
- PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+ CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -176,7 +180,9 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
ROW_ROW_SERIALIZABLE_FUNCTION,
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_ROW_CODER);
- PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+ CsvIOParseResult<Row> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -193,7 +199,9 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
nullableAllPrimitiveDataTypesFromRowFn(),
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_CODER);
- PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+ CsvIOParseResult<NullableAllPrimitiveDataTypes> result =
input.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -214,8 +222,10 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
allPrimitiveDataTypesFromRowFn(),
ALL_PRIMITIVE_DATA_TYPES_CODER);
- input.apply(underTest);
- assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run);
+ CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).empty();
+
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+ pipeline.run();
}
@Test
@@ -228,7 +238,9 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
ROW_ROW_SERIALIZABLE_FUNCTION,
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_ROW_CODER);
- PAssert.that(input.apply(underTest)).empty();
+ CsvIOParseResult<Row> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).empty();
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -242,7 +254,9 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
allPrimitiveDataTypesFromRowFn(),
ALL_PRIMITIVE_DATA_TYPES_CODER);
- PAssert.that(input.apply(underTest)).empty();
+ CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).empty();
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -270,7 +284,9 @@ public class CsvIORecordToObjectsTest {
timeContainingCustomProcessingMap(),
ROW_ROW_SERIALIZABLE_FUNCTION,
TIME_CONTAINING_ROW_CODER);
- PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+ CsvIOParseResult<Row> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -295,7 +311,9 @@ public class CsvIORecordToObjectsTest {
timeContainingCustomProcessingMap(),
timeContainingFromRowFn(),
TIME_CONTAINING_POJO_CODER);
- PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+ CsvIOParseResult<TimeContaining> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -310,8 +328,11 @@ public class CsvIORecordToObjectsTest {
emptyCustomProcessingMap(),
allPrimitiveDataTypesFromRowFn(),
ALL_PRIMITIVE_DATA_TYPES_CODER);
- input.apply(underTest);
- assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run);
+ CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).empty();
+
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+
+ pipeline.run();
}
@Test
@@ -328,8 +349,11 @@ public class CsvIORecordToObjectsTest {
timeContainingCustomProcessingMap(),
timeContainingFromRowFn(),
TIME_CONTAINING_POJO_CODER);
- input.apply(underTest);
- assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run);
+ CsvIOParseResult<TimeContaining> result = input.apply(underTest);
+ PAssert.that(result.getOutput()).empty();
+
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+
+ pipeline.run();
}
private static PCollection<List<String>> csvRecords(Pipeline pipeline,
String... cells) {
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
index 1b81391c4fb..7cbba3335dd 100644
---
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.sdk.io.csv.CsvIOStringToCsvRecord.headerLine;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -47,12 +48,14 @@ public class CsvIOStringToCsvRecordTest {
Create.of(headerLine(csvFormat), "#should skip me", "a,1,1.1",
"b,2,2.2", "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -65,13 +68,15 @@ public class CsvIOStringToCsvRecordTest {
Create.of(headerLine(csvFormat), "#comment", "a,1,1.1", "b,2,2.2",
"c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Collections.singletonList("#comment"),
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -83,12 +88,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a;1;1.1", "b;2;2.2",
"c;3;3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -100,12 +107,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a$,b,1,1.1",
"b,2,2.2", "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a,b", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -117,12 +126,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2",
"c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -134,12 +145,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "",
"b,2,2.2", "", "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -151,12 +164,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "",
"b,2,2.2", "", "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -173,12 +188,14 @@ public class CsvIOStringToCsvRecordTest {
"c,3, 3.3 "));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -195,12 +212,14 @@ public class CsvIOStringToCsvRecordTest {
"c,3, 3.3 "));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList(" a ", "1", "1.1"),
Arrays.asList("b", " 2 ", "2.2"),
Arrays.asList("c", "3", " 3.3 ")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -212,12 +231,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2",
"🐼,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", null),
Arrays.asList("b", null, "2.2"),
Arrays.asList(null, "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -229,12 +250,14 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2",
"🐼,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "🐼"),
Arrays.asList("b", "🐼", "2.2"),
Arrays.asList("🐼", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -246,12 +269,15 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), ":a,:,1,1.1",
"b,2,2.2", "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a,", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -267,12 +293,14 @@ public class CsvIOStringToCsvRecordTest {
"\"c\",\"3\",\"3.3\""));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -289,12 +317,14 @@ public class CsvIOStringToCsvRecordTest {
"\"c\",\"3\",\"3.3\""));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", null),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -306,12 +336,15 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "\"a,\",1,1.1",
"b,2,2.2", "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a,", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -323,12 +356,15 @@ public class CsvIOStringToCsvRecordTest {
Create.of(headerLine(csvFormat), "\"a\",1,1.1", "\"b\",2,2.2",
"\"c\",3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -339,12 +375,15 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2",
"c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -355,10 +394,13 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat),
"a,1,1.1😆b,2,2.2😆c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Collections.singletonList(
Arrays.asList("a", "1", "1.1😆b", "2", "2.2😆c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -373,12 +415,15 @@ public class CsvIOStringToCsvRecordTest {
"a,1,1.1" + systemRecordSeparator + "b,2,2.2" +
systemRecordSeparator + "c,3,3.3"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -389,12 +434,15 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,",
"b,2,2.2,", "c,3,3.3,"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -405,12 +453,15 @@ public class CsvIOStringToCsvRecordTest {
pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,",
"b,2,2.2,", "c,3,3.3,"));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1", ""),
Arrays.asList("b", "2", "2.2", ""),
Arrays.asList("c", "3", "3.3", "")));
+ PAssert.that(result.getErrors()).empty();
+
pipeline.run();
}
@@ -426,12 +477,14 @@ public class CsvIOStringToCsvRecordTest {
"c,3, 3.3 "));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a", "1", "1.1"),
Arrays.asList("b", "2", "2.2"),
Arrays.asList("c", "3", "3.3")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -448,12 +501,14 @@ public class CsvIOStringToCsvRecordTest {
"c,3, 3.3 "));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList(" a ", "1", "1.1"),
Arrays.asList("b", " 2 ", "2.2"),
Arrays.asList("c", "3", " 3.3 ")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -464,8 +519,10 @@ public class CsvIOStringToCsvRecordTest {
PCollection<String> input = pipeline.apply(Create.of(csvRecord));
CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat());
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(Collections.singletonList(Arrays.asList("a",
"1")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}
@@ -478,12 +535,14 @@ public class CsvIOStringToCsvRecordTest {
CsvIOStringToCsvRecord underTest =
new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n'));
- PAssert.that(input.apply(underTest))
+ CsvIOParseResult<List<String>> result = input.apply(underTest);
+ PAssert.that(result.getOutput())
.containsInAnyOrder(
Arrays.asList(
Arrays.asList("a\r\n1", "a\r\n2"),
Arrays.asList("b\r\n1", "b\r\n2"),
Arrays.asList("c\r\n1", "c\r\n2")));
+ PAssert.that(result.getErrors()).empty();
pipeline.run();
}