This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c624e02b852 [CsvIO] update error and result handling. (#32023)
c624e02b852 is described below

commit c624e02b852efc8511c940977173c756f4653ed6
Author: Francis O'Hara <[email protected]>
AuthorDate: Tue Jul 30 17:36:29 2024 +0000

    [CsvIO] update error and result handling. (#32023)
    
    Co-authored-by: Lahari Guduru 
<[email protected]>
---
 .../beam/sdk/io/csv/CsvIOParseConfiguration.java   |  25 -----
 .../apache/beam/sdk/io/csv/CsvIOParseError.java    |  24 +++++
 .../org/apache/beam/sdk/io/csv/CsvIOParseKV.java   |  15 ++-
 .../apache/beam/sdk/io/csv/CsvIOParseResult.java   | 100 +++++++++++++++++++
 .../beam/sdk/io/csv/CsvIORecordToObjects.java      |  51 +++++++---
 .../beam/sdk/io/csv/CsvIOStringToCsvRecord.java    |  45 +++++++--
 .../beam/sdk/io/csv/CsvIORecordToObjectsTest.java  |  54 +++++++---
 .../sdk/io/csv/CsvIOStringToCsvRecordTest.java     | 109 ++++++++++++++++-----
 8 files changed, 332 insertions(+), 91 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
index 87e0128d73e..dd9ef5b3486 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
@@ -24,12 +24,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.commons.csv.CSVFormat;
 
@@ -37,10 +32,6 @@ import org.apache.commons.csv.CSVFormat;
 @AutoValue
 abstract class CsvIOParseConfiguration<T> implements Serializable {
 
-  /** A Dead Letter Queue that returns potential errors with {@link 
BadRecord}. */
-  final PTransform<PCollection<BadRecord>, PCollection<BadRecord>> 
errorHandlerTransform =
-      new BadRecordOutput();
-
   static <T> Builder<T> builder() {
     return new AutoValue_CsvIOParseConfiguration.Builder<>();
   }
@@ -84,20 +75,4 @@ abstract class CsvIOParseConfiguration<T> implements 
Serializable {
       return autoBuild();
     }
   }
-
-  private static class BadRecordOutput
-      extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {
-
-    @Override
-    public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
-      return input.apply(ParDo.of(new BadRecordTransformFn()));
-    }
-
-    private static class BadRecordTransformFn extends DoFn<BadRecord, 
BadRecord> {
-      @ProcessElement
-      public void process(@Element BadRecord input, OutputReceiver<BadRecord> 
receiver) {
-        receiver.output(input);
-      }
-    }
-  }
 }
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
index ad7d05912fa..7a2be9786d7 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
@@ -17,9 +17,18 @@
  */
 package org.apache.beam.sdk.io.csv;
 
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
 import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaProvider;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
@@ -36,6 +45,21 @@ public abstract class CsvIOParseError {
     return new AutoValue_CsvIOParseError.Builder();
   }
 
+  private static final SchemaProvider SCHEMA_PROVIDER = new AutoValueSchema();
+
+  private static final TypeDescriptor<CsvIOParseError> TYPE =
+      TypeDescriptor.of(CsvIOParseError.class);
+
+  private static final Schema SCHEMA = 
checkStateNotNull(SCHEMA_PROVIDER.schemaFor(TYPE));
+
+  private static final SerializableFunction<CsvIOParseError, Row> TO_ROW_FN =
+      checkStateNotNull(SCHEMA_PROVIDER.toRowFunction(TYPE));
+
+  private static final SerializableFunction<Row, CsvIOParseError> FROM_ROW_FN =
+      checkStateNotNull(SCHEMA_PROVIDER.fromRowFunction(TYPE));
+
+  static final Coder<CsvIOParseError> CODER = SchemaCoder.of(SCHEMA, TYPE, 
TO_ROW_FN, FROM_ROW_FN);
+
   /** The caught {@link Exception#getMessage()}. */
   public abstract String getMessage();
 
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
index 1b8e43314b1..6ddafdccd9f 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java
@@ -17,9 +17,8 @@
  */
 package org.apache.beam.sdk.io.csv;
 
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.csv.CSVRecord;
@@ -31,11 +30,17 @@ import org.apache.commons.csv.CSVRecord;
 // TODO(https://github.com/apache/beam/issues/31873): implement class after 
all dependencies are
 // completed.
 class CsvIOParseKV<T>
-    extends PTransform<PCollection<KV<String, Iterable<String>>>, 
PCollection<T>> {
+    extends PTransform<PCollection<KV<String, Iterable<String>>>, 
CsvIOParseResult<T>> {
+
+  private final Coder<T> outputCoder;
+
+  private CsvIOParseKV(Coder<T> outputCoder) {
+    this.outputCoder = outputCoder;
+  }
 
   // TODO(https://github.com/apache/beam/issues/31873): implement method.
   @Override
-  public PCollection<T> expand(PCollection<KV<String, Iterable<String>>> 
input) {
-    return input.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, T>() 
{}));
+  public CsvIOParseResult<T> expand(PCollection<KV<String, Iterable<String>>> 
input) {
+    return CsvIOParseResult.empty(input.getPipeline(), outputCoder);
   }
 }
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
new file mode 100644
index 00000000000..77264fccd2c
--- /dev/null
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The {@link T} and {@link CsvIOParseError} {@link PCollection} results of 
parsing CSV records. Use
+ * {@link #getOutput()} and {@link #getErrors()} to apply these results in a 
pipeline.
+ */
+public class CsvIOParseResult<T> implements POutput {
+
+  static <T> CsvIOParseResult<T> of(
+      TupleTag<T> outputTag,
+      Coder<T> outputCoder,
+      TupleTag<CsvIOParseError> errorTag,
+      PCollectionTuple pct) {
+    return new CsvIOParseResult<>(outputTag, outputCoder, errorTag, pct);
+  }
+
+  static <T> CsvIOParseResult<T> empty(Pipeline pipeline, Coder<T> 
outputCoder) {
+    return new CsvIOParseResult<>(
+        new TupleTag<T>() {},
+        outputCoder,
+        new TupleTag<CsvIOParseError>() {},
+        PCollectionTuple.empty(pipeline));
+  }
+
+  private final Pipeline pipeline;
+  private final TupleTag<T> outputTag;
+  private final PCollection<T> output;
+  private final TupleTag<CsvIOParseError> errorTag;
+  private final PCollection<CsvIOParseError> errors;
+
+  private CsvIOParseResult(
+      TupleTag<T> outputTag,
+      Coder<T> outputCoder,
+      TupleTag<CsvIOParseError> errorTag,
+      PCollectionTuple pct) {
+    this.outputTag = outputTag;
+    this.errorTag = errorTag;
+    this.pipeline = pct.getPipeline();
+    this.output = pct.get(outputTag).setCoder(outputCoder);
+    this.errors = pct.get(errorTag).setCoder(CsvIOParseError.CODER);
+  }
+
+  /** The {@link T} {@link PCollection} as a result of successfully parsing 
CSV records. */
+  public PCollection<T> getOutput() {
+    return output;
+  }
+
+  /**
+   * The {@link CsvIOParseError} {@link PCollection} as a result of errors 
associated with parsing
+   * CSV records.
+   */
+  public PCollection<CsvIOParseError> getErrors() {
+    return errors;
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  @Override
+  public Map<TupleTag<?>, PValue> expand() {
+    return ImmutableMap.of(
+        outputTag, output,
+        errorTag, errors);
+  }
+
+  @Override
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {}
+}
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
index 4340b68f3c4..97bceb47934 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.csv;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -27,13 +28,18 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
+import org.joda.time.Instant;
 
 /**
  * {@link CsvIORecordToObjects} is a class that takes an input of {@link 
PCollection<List<String>>}
  * and outputs custom type {@link PCollection<T>}.
  */
-class CsvIORecordToObjects<T> extends PTransform<PCollection<List<String>>, 
PCollection<T>> {
+class CsvIORecordToObjects<T> extends PTransform<PCollection<List<String>>, 
CsvIOParseResult<T>> {
 
   /** The expected {@link Schema} of the target type. */
   private final Schema schema;
@@ -44,6 +50,10 @@ class CsvIORecordToObjects<T> extends 
PTransform<PCollection<List<String>>, PCol
   /** A {@link Map} of {@link Schema.Field}s to their expected positions 
within the CSV record. */
   private final Map<Integer, Schema.Field> indexToFieldMap;
 
+  private final TupleTag<T> outputTag = new TupleTag<T>() {};
+
+  private final TupleTag<CsvIOParseError> errorTag = new 
TupleTag<CsvIOParseError>() {};
+
   /**
    * A {@link SerializableFunction} that converts from {@link Row} to {@link 
Schema} mapped custom
    * type.
@@ -63,23 +73,40 @@ class CsvIORecordToObjects<T> extends 
PTransform<PCollection<List<String>>, PCol
   }
 
   @Override
-  public PCollection<T> expand(PCollection<List<String>> input) {
-    return input.apply(ParDo.of(new RecordToObjectsFn())).setCoder(coder);
+  public CsvIOParseResult<T> expand(PCollection<List<String>> input) {
+    PCollectionTuple pct =
+        input.apply(
+            RecordToObjectsFn.class.getSimpleName(),
+            ParDo.of(new RecordToObjectsFn()).withOutputTags(outputTag, 
TupleTagList.of(errorTag)));
+
+    return CsvIOParseResult.of(outputTag, coder, errorTag, pct);
   }
 
   private class RecordToObjectsFn extends DoFn<List<String>, T> {
     @ProcessElement
-    public void process(@Element List<String> record, OutputReceiver<T> 
receiver) {
+    public void process(@Element List<String> record, MultiOutputReceiver 
receiver) {
       Map<String, Object> fieldNamesToValues = new HashMap<>();
-      for (Map.Entry<Integer, Schema.Field> entry : 
indexToFieldMap.entrySet()) {
-        Schema.Field field = entry.getValue();
-        int index = entry.getKey();
-        String cell = record.get(index);
-        Object value = parseCell(cell, field);
-        fieldNamesToValues.put(field.getName(), value);
+      try {
+        for (Map.Entry<Integer, Schema.Field> entry : 
indexToFieldMap.entrySet()) {
+          Schema.Field field = entry.getValue();
+          int index = entry.getKey();
+          String cell = record.get(index);
+          Object value = parseCell(cell, field);
+          fieldNamesToValues.put(field.getName(), value);
+        }
+        Row row = 
Row.withSchema(schema).withFieldValues(fieldNamesToValues).build();
+        receiver.get(outputTag).output(fromRowFn.apply(row));
+      } catch (RuntimeException e) {
+        receiver
+            .get(errorTag)
+            .output(
+                CsvIOParseError.builder()
+                    .setCsvRecord(record.toString())
+                    .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
+                    .setStackTrace(Throwables.getStackTraceAsString(e))
+                    .setObservedTimestamp(Instant.now())
+                    .build());
       }
-      Row row = 
Row.withSchema(schema).withFieldValues(fieldNamesToValues).build();
-      receiver.output(fromRowFn.apply(row));
     }
   }
 
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
index b5ce6a0fec2..5fc4954cb45 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.csv;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -27,9 +28,14 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
+import org.joda.time.Instant;
 
 /**
  * {@link CsvIOStringToCsvRecord} is a class that takes a {@link 
PCollection<String>} input and
@@ -37,9 +43,14 @@ import org.apache.commons.csv.CSVRecord;
  * targeted error detection.
  */
 final class CsvIOStringToCsvRecord
-    extends PTransform<PCollection<String>, PCollection<List<String>>> {
+    extends PTransform<PCollection<String>, CsvIOParseResult<List<String>>> {
+
   private final CSVFormat csvFormat;
 
+  private final TupleTag<List<String>> outputTag = new 
TupleTag<List<String>>() {};
+
+  private final TupleTag<CsvIOParseError> errorTag = new 
TupleTag<CsvIOParseError>() {};
+
   CsvIOStringToCsvRecord(CSVFormat csvFormat) {
     this.csvFormat = csvFormat;
   }
@@ -49,10 +60,15 @@ final class CsvIOStringToCsvRecord
    * to Row or custom type.
    */
   @Override
-  public PCollection<List<String>> expand(PCollection<String> input) {
-    return input
-        .apply(ParDo.of(new ProcessLineToRecordFn()))
-        .setCoder(ListCoder.of(NullableCoder.of(StringUtf8Coder.of())));
+  public CsvIOParseResult<List<String>> expand(PCollection<String> input) {
+    PCollectionTuple pct =
+        input.apply(
+            ProcessLineToRecordFn.class.getSimpleName(),
+            ParDo.of(new ProcessLineToRecordFn())
+                .withOutputTags(outputTag, TupleTagList.of(errorTag)));
+
+    return CsvIOParseResult.of(
+        outputTag, ListCoder.of(NullableCoder.of(StringUtf8Coder.of())), 
errorTag, pct);
   }
 
   /** Processes each line in order to convert it to a {@link CSVRecord}. */
@@ -60,13 +76,24 @@ final class CsvIOStringToCsvRecord
     private final String headerLine = headerLine(csvFormat);
 
     @ProcessElement
-    public void process(@Element String line, OutputReceiver<List<String>> 
receiver)
-        throws IOException {
+    public void process(@Element String line, MultiOutputReceiver receiver) {
       if (headerLine.equals(line)) {
         return;
       }
-      for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
-        receiver.output(csvRecordtoList(record));
+      try (CSVParser csvParser = CSVParser.parse(line, csvFormat)) {
+        for (CSVRecord record : csvParser.getRecords()) {
+          receiver.get(outputTag).output(csvRecordtoList(record));
+        }
+      } catch (IOException e) {
+        receiver
+            .get(errorTag)
+            .output(
+                CsvIOParseError.builder()
+                    .setCsvRecord(line)
+                    .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
+                    .setObservedTimestamp(Instant.now())
+                    .setStackTrace(Throwables.getStackTraceAsString(e))
+                    .build());
       }
     }
   }
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
index eb8cacdec5a..9ccb5d0c7bc 100644
--- 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java
@@ -35,7 +35,6 @@ import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrim
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
-import static org.junit.Assert.assertThrows;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -55,6 +54,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -135,7 +135,9 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             ROW_ROW_SERIALIZABLE_FUNCTION,
             ALL_PRIMITIVE_DATA_TYPES_ROW_CODER);
-    PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+    CsvIOParseResult<Row> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -152,7 +154,9 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             allPrimitiveDataTypesFromRowFn(),
             ALL_PRIMITIVE_DATA_TYPES_CODER);
-    PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+    CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -176,7 +180,9 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             ROW_ROW_SERIALIZABLE_FUNCTION,
             NULLABLE_ALL_PRIMITIVE_DATA_TYPES_ROW_CODER);
-    PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+    CsvIOParseResult<Row> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -193,7 +199,9 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             nullableAllPrimitiveDataTypesFromRowFn(),
             NULLABLE_ALL_PRIMITIVE_DATA_TYPES_CODER);
-    PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+    CsvIOParseResult<NullableAllPrimitiveDataTypes> result = 
input.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -214,8 +222,10 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             allPrimitiveDataTypesFromRowFn(),
             ALL_PRIMITIVE_DATA_TYPES_CODER);
-    input.apply(underTest);
-    assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run);
+    CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).empty();
+    
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+    pipeline.run();
   }
 
   @Test
@@ -228,7 +238,9 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             ROW_ROW_SERIALIZABLE_FUNCTION,
             NULLABLE_ALL_PRIMITIVE_DATA_TYPES_ROW_CODER);
-    PAssert.that(input.apply(underTest)).empty();
+    CsvIOParseResult<Row> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).empty();
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -242,7 +254,9 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             allPrimitiveDataTypesFromRowFn(),
             ALL_PRIMITIVE_DATA_TYPES_CODER);
-    PAssert.that(input.apply(underTest)).empty();
+    CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).empty();
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -270,7 +284,9 @@ public class CsvIORecordToObjectsTest {
             timeContainingCustomProcessingMap(),
             ROW_ROW_SERIALIZABLE_FUNCTION,
             TIME_CONTAINING_ROW_CODER);
-    PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+    CsvIOParseResult<Row> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -295,7 +311,9 @@ public class CsvIORecordToObjectsTest {
             timeContainingCustomProcessingMap(),
             timeContainingFromRowFn(),
             TIME_CONTAINING_POJO_CODER);
-    PAssert.that(input.apply(underTest)).containsInAnyOrder(want);
+    CsvIOParseResult<TimeContaining> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
     pipeline.run();
   }
 
@@ -310,8 +328,11 @@ public class CsvIORecordToObjectsTest {
             emptyCustomProcessingMap(),
             allPrimitiveDataTypesFromRowFn(),
             ALL_PRIMITIVE_DATA_TYPES_CODER);
-    input.apply(underTest);
-    assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run);
+    CsvIOParseResult<AllPrimitiveDataTypes> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).empty();
+    
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+
+    pipeline.run();
   }
 
   @Test
@@ -328,8 +349,11 @@ public class CsvIORecordToObjectsTest {
             timeContainingCustomProcessingMap(),
             timeContainingFromRowFn(),
             TIME_CONTAINING_POJO_CODER);
-    input.apply(underTest);
-    assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run);
+    CsvIOParseResult<TimeContaining> result = input.apply(underTest);
+    PAssert.that(result.getOutput()).empty();
+    
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+
+    pipeline.run();
   }
 
   private static PCollection<List<String>> csvRecords(Pipeline pipeline, 
String... cells) {
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
index 1b81391c4fb..7cbba3335dd 100644
--- 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.io.csv.CsvIOStringToCsvRecord.headerLine;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -47,12 +48,14 @@ public class CsvIOStringToCsvRecordTest {
             Create.of(headerLine(csvFormat), "#should skip me", "a,1,1.1", 
"b,2,2.2", "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -65,13 +68,15 @@ public class CsvIOStringToCsvRecordTest {
             Create.of(headerLine(csvFormat), "#comment", "a,1,1.1", "b,2,2.2", 
"c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Collections.singletonList("#comment"),
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -83,12 +88,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a;1;1.1", "b;2;2.2", 
"c;3;3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -100,12 +107,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a$,b,1,1.1", 
"b,2,2.2", "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a,b", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -117,12 +126,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", 
"c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -134,12 +145,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", 
"b,2,2.2", "", "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -151,12 +164,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", 
"b,2,2.2", "", "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -173,12 +188,14 @@ public class CsvIOStringToCsvRecordTest {
                 "c,3,   3.3         "));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -195,12 +212,14 @@ public class CsvIOStringToCsvRecordTest {
                 "c,3,   3.3         "));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("  a  ", "1", "1.1"),
                 Arrays.asList("b", "        2     ", "2.2"),
                 Arrays.asList("c", "3", "   3.3         ")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -212,12 +231,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", 
"🐼,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", null),
                 Arrays.asList("b", null, "2.2"),
                 Arrays.asList(null, "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -229,12 +250,14 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", 
"🐼,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "🐼"),
                 Arrays.asList("b", "🐼", "2.2"),
                 Arrays.asList("🐼", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -246,12 +269,15 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), ":a,:,1,1.1", 
"b,2,2.2", "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a,", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -267,12 +293,14 @@ public class CsvIOStringToCsvRecordTest {
                 "\"c\",\"3\",\"3.3\""));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -289,12 +317,14 @@ public class CsvIOStringToCsvRecordTest {
                 "\"c\",\"3\",\"3.3\""));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", null),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -306,12 +336,15 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "\"a,\",1,1.1", 
"b,2,2.2", "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a,", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -323,12 +356,15 @@ public class CsvIOStringToCsvRecordTest {
             Create.of(headerLine(csvFormat), "\"a\",1,1.1", "\"b\",2,2.2", 
"\"c\",3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -339,12 +375,15 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", 
"c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -355,10 +394,13 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), 
"a,1,1.1😆b,2,2.2😆c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Collections.singletonList(
                 Arrays.asList("a", "1", "1.1😆b", "2", "2.2😆c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -373,12 +415,15 @@ public class CsvIOStringToCsvRecordTest {
                 "a,1,1.1" + systemRecordSeparator + "b,2,2.2" + 
systemRecordSeparator + "c,3,3.3"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -389,12 +434,15 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", 
"b,2,2.2,", "c,3,3.3,"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -405,12 +453,15 @@ public class CsvIOStringToCsvRecordTest {
         pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", 
"b,2,2.2,", "c,3,3.3,"));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1", ""),
                 Arrays.asList("b", "2", "2.2", ""),
                 Arrays.asList("c", "3", "3.3", "")));
+    PAssert.that(result.getErrors()).empty();
+
     pipeline.run();
   }
 
@@ -426,12 +477,14 @@ public class CsvIOStringToCsvRecordTest {
                 "c,3,   3.3         "));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a", "1", "1.1"),
                 Arrays.asList("b", "2", "2.2"),
                 Arrays.asList("c", "3", "3.3")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -448,12 +501,14 @@ public class CsvIOStringToCsvRecordTest {
                 "c,3,   3.3         "));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("  a  ", "1", "1.1"),
                 Arrays.asList("b", "        2     ", "2.2"),
                 Arrays.asList("c", "3", "   3.3         ")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -464,8 +519,10 @@ public class CsvIOStringToCsvRecordTest {
     PCollection<String> input = pipeline.apply(Create.of(csvRecord));
 
     CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat());
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", 
"1")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }
@@ -478,12 +535,14 @@ public class CsvIOStringToCsvRecordTest {
 
     CsvIOStringToCsvRecord underTest =
         new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n'));
-    PAssert.that(input.apply(underTest))
+    CsvIOParseResult<List<String>> result = input.apply(underTest);
+    PAssert.that(result.getOutput())
         .containsInAnyOrder(
             Arrays.asList(
                 Arrays.asList("a\r\n1", "a\r\n2"),
                 Arrays.asList("b\r\n1", "b\r\n2"),
                 Arrays.asList("c\r\n1", "c\r\n2")));
+    PAssert.that(result.getErrors()).empty();
 
     pipeline.run();
   }


Reply via email to