This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eadb81fd01f Expand test coverage (#31957)
eadb81fd01f is described below

commit eadb81fd01f281755f18a8a3095e3c2c8194e9fc
Author: Francis O'Hara <[email protected]>
AuthorDate: Tue Jul 23 19:18:45 2024 +0000

    Expand test coverage (#31957)
    
    Co-authored-by: Lahari Guduru 
<[email protected]>
---
 .../java/org/apache/beam/sdk/io/csv/CsvIO.java     |   3 +
 .../apache/beam/sdk/io/csv/CsvIOParseHelpers.java  |   4 +
 .../beam/sdk/io/csv/CsvIOStringToCsvRecord.java    |  17 +-
 .../beam/sdk/io/csv/CsvIOParseHelpersTest.java     |  12 +
 .../sdk/io/csv/CsvIOStringToCsvRecordTest.java     | 445 ++++++++++++++++++---
 5 files changed, 433 insertions(+), 48 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
index 6d940f7d96d..04141e5c677 100644
--- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
@@ -74,6 +74,9 @@ import org.apache.commons.csv.CSVFormat;
  *   <li>{@code boolean} <a
  *       
href="https://javadoc.io/static/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html#withIgnoreHeaderCase--";>ignoreHeaderCase</a>
  *       - must be false.
+ *   <li>{@code boolean} <a
+ *       
href="https://javadoc.io/static/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html#withSkipHeaderRecord--";>skipHeaderRecord</a>
+ *       - must be false. The header is already accounted for during parsing.
  * </ul>
  *
  * <h4>Ignored CSVFormat parameters</h4>
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
index 856ccf42d84..4e4102f0efb 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
@@ -62,6 +62,10 @@ final class CsvIOParseHelpers {
           "Illegal %s: column name is required",
           CSVFormat.class);
     }
+    checkArgument(
+        !format.getSkipHeaderRecord(),
+        "Illegal %s: cannot skip header record because the header is already 
accounted for",
+        CSVFormat.class);
   }
 
   /**
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
index c92961f94a9..b5ce6a0fec2 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
@@ -20,6 +20,9 @@ package org.apache.beam.sdk.io.csv;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -47,14 +50,21 @@ final class CsvIOStringToCsvRecord
    */
   @Override
   public PCollection<List<String>> expand(PCollection<String> input) {
-    return input.apply(ParDo.of(new ProcessLineToRecordFn()));
+    return input
+        .apply(ParDo.of(new ProcessLineToRecordFn()))
+        .setCoder(ListCoder.of(NullableCoder.of(StringUtf8Coder.of())));
   }
 
   /** Processes each line in order to convert it to a {@link CSVRecord}. */
   private class ProcessLineToRecordFn extends DoFn<String, List<String>> {
+    private final String headerLine = headerLine(csvFormat);
+
     @ProcessElement
     public void process(@Element String line, OutputReceiver<List<String>> 
receiver)
         throws IOException {
+      if (headerLine.equals(line)) {
+        return;
+      }
       for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
         receiver.output(csvRecordtoList(record));
       }
@@ -69,4 +79,9 @@ final class CsvIOStringToCsvRecord
     }
     return cells;
   }
+
+  /** Returns a formatted line of the CSVFormat header. */
+  static String headerLine(CSVFormat csvFormat) {
+    return String.join(String.valueOf(csvFormat.getDelimiter()), 
csvFormat.getHeader());
+  }
 }
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
index 5a387652022..97374cf52fe 100644
--- 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
@@ -121,6 +121,18 @@ public class CsvIOParseHelpersTest {
         gotMessage);
   }
 
+  @Test
+  public void givenCSVFormatThatSkipsHeaderRecord_throwsException() {
+    CSVFormat format = csvFormatWithHeader().withSkipHeaderRecord(true);
+    String gotMessage =
+        assertThrows(
+                IllegalArgumentException.class, () -> 
CsvIOParseHelpers.validateCsvFormat(format))
+            .getMessage();
+    assertEquals(
+        "Illegal class org.apache.commons.csv.CSVFormat: cannot skip header 
record because the header is already accounted for",
+        gotMessage);
+  }
+
   /** End of tests for {@link CsvIOParseHelpers#validateCsvFormat(CSVFormat)}. 
*/
   
//////////////////////////////////////////////////////////////////////////////////////////////
 
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
index 44db791cbee..1b81391c4fb 100644
--- 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.csv;
 
+import static org.apache.beam.sdk.io.csv.CsvIOStringToCsvRecord.headerLine;
+
 import java.util.Arrays;
 import java.util.Collections;
 import org.apache.beam.sdk.testing.PAssert;
@@ -24,63 +26,97 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 
 /** Tests for {@link CsvIOStringToCsvRecord}. */
+@RunWith(JUnit4.class)
 public class CsvIOStringToCsvRecordTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
 
+  private static final String[] header = {"a_string", "an_integer", 
"a_double"};
+
   @Test
-  public void testSingleLineCsvRecord() {
-    String csvRecord = "a,1";
-    PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+  public void givenCommentMarker_skipsLine() {
+    CSVFormat csvFormat = csvFormat().withCommentMarker('#');
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(headerLine(csvFormat), "#should skip me", "a,1,1.1", 
"b,2,2.2", "c,3,3.3"));
 
-    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat());
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
     PAssert.that(input.apply(underTest))
-        .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", 
"1")));
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
 
     pipeline.run();
   }
 
   @Test
-  public void testMultiLineCsvRecord() {
-    String csvRecords =
-        "\"a\r\n1\",\"a\r\n2\"" + "\n" + "\"b\r\n1\",\"b\r\n2\"" + "\n" + 
"\"c\r\n1\",\"c\r\n2\"";
-    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+  public void givenNoCommentMarker_doesntSkipLine() {
+    CSVFormat csvFormat = csvFormat();
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(headerLine(csvFormat), "#comment", "a,1,1.1", "b,2,2.2", 
"c,3,3.3"));
 
-    CsvIOStringToCsvRecord underTest =
-        new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n'));
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
     PAssert.that(input.apply(underTest))
         .containsInAnyOrder(
             Arrays.asList(
-                Arrays.asList("a\r\n1", "a\r\n2"),
-                Arrays.asList("b\r\n1", "b\r\n2"),
-                Arrays.asList("c\r\n1", "c\r\n2")));
+                Collections.singletonList("#comment"),
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecordsWithSkipHeaderRecord() {
-    String csvRecords = "a_string,an_integer\na,1\nb,2\n";
-    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+  public void givenCustomDelimiter_splitsCells() {
+    CSVFormat csvFormat = csvFormat().withDelimiter(';');
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a;1;1.1", "b;2;2.2", 
"c;3;3.3"));
 
-    CsvIOStringToCsvRecord underTest =
-        new CsvIOStringToCsvRecord(csvFormat().withSkipHeaderRecord());
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenEscapeCharacter_includeInCell() {
+    CSVFormat csvFormat = csvFormat().withEscape('$');
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a$,b,1,1.1", 
"b,2,2.2", "c,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
     PAssert.that(input.apply(underTest))
-        .containsInAnyOrder(Arrays.asList(Arrays.asList("a", "1"), 
Arrays.asList("b", "2")));
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a,b", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecordsWithCommentMarker() {
-    String csvRecords = "#leaving a comment\n" + "a,1,1.1\nb,2,2.2\nc,3,3.3";
-    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+  public void givenHeaderComment_isNoop() {
+    CSVFormat csvFormat = csvFormat().withHeaderComments("abc", "def", "xyz");
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", 
"c,3,3.3"));
 
-    CsvIOStringToCsvRecord underTest =
-        new CsvIOStringToCsvRecord(csvFormat().withCommentMarker('#'));
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
     PAssert.that(input.apply(underTest))
         .containsInAnyOrder(
             Arrays.asList(
@@ -92,52 +128,367 @@ public class CsvIOStringToCsvRecordTest {
   }
 
   @Test
-  public void testCsvRecordsWithIgnoreEmptyLines() {
-    String csvRecords = "line1\nline2\nline3\nline4\nline5\n\n\nline6";
-    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+  public void givenIgnoreEmptyLines_shouldSkip() {
+    CSVFormat csvFormat = csvFormat().withIgnoreEmptyLines(true);
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", 
"b,2,2.2", "", "c,3,3.3"));
 
-    CsvIOStringToCsvRecord underTest =
-        new CsvIOStringToCsvRecord(csvFormat().withIgnoreEmptyLines());
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
     PAssert.that(input.apply(underTest))
         .containsInAnyOrder(
             Arrays.asList(
-                Collections.singletonList("line1"),
-                Collections.singletonList("line2"),
-                Collections.singletonList("line3"),
-                Collections.singletonList("line4"),
-                Collections.singletonList("line5"),
-                Collections.singletonList("line6")));
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecordWithIgnoreSurroundingSpaces() {
-    String csvRecord = "    Seattle     ,   WA   ";
-    PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+  public void givenNoIgnoreEmptyLines_isNoop() {
+    CSVFormat csvFormat = csvFormat().withIgnoreEmptyLines(false);
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", 
"b,2,2.2", "", "c,3,3.3"));
 
-    CsvIOStringToCsvRecord underTest =
-        new CsvIOStringToCsvRecord(csvFormat().withIgnoreSurroundingSpaces());
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenIgnoreSurroundingSpaces_removesSpaces() {
+    CSVFormat csvFormat = csvFormat().withIgnoreSurroundingSpaces(true);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "  a  ,1,1.1",
+                "b,        2     ,2.2",
+                "c,3,   3.3         "));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenNotIgnoreSurroundingSpaces_keepsSpaces() {
+    CSVFormat csvFormat = csvFormat().withIgnoreSurroundingSpaces(false);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "  a  ,1,1.1",
+                "b,        2     ,2.2",
+                "c,3,   3.3         "));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("  a  ", "1", "1.1"),
+                Arrays.asList("b", "        2     ", "2.2"),
+                Arrays.asList("c", "3", "   3.3         ")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenNullString_parsesNullCells() {
+    CSVFormat csvFormat = csvFormat().withNullString("🐼");
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", 
"🐼,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", null),
+                Arrays.asList("b", null, "2.2"),
+                Arrays.asList(null, "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenNoNullString_isNoop() {
+    CSVFormat csvFormat = csvFormat();
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", 
"🐼,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "🐼"),
+                Arrays.asList("b", "🐼", "2.2"),
+                Arrays.asList("🐼", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenCustomQuoteCharacter_includesSpecialCharacters() {
+    CSVFormat csvFormat = csvFormat().withQuote(':');
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), ":a,:,1,1.1", 
"b,2,2.2", "c,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a,", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenQuoteModeAll_isNoop() {
+    CSVFormat csvFormat = csvFormat().withQuoteMode(QuoteMode.ALL);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "\"a\",\"1\",\"1.1\"",
+                "\"b\",\"2\",\"2.2\"",
+                "\"c\",\"3\",\"3.3\""));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
     PAssert.that(input.apply(underTest))
-        .containsInAnyOrder(Collections.singletonList(Arrays.asList("Seattle", 
"WA")));
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecordWithTrailingDelimiter() {
-    String csvRecord = "a,b,c,";
+  public void givenQuoteModeAllNonNull_isNoop() {
+    CSVFormat csvFormat = 
csvFormat().withNullString("N/A").withQuoteMode(QuoteMode.ALL_NON_NULL);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "\"a\",\"1\",N/A",
+                "\"b\",\"2\",\"2.2\"",
+                "\"c\",\"3\",\"3.3\""));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", null),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenQuoteModeMinimal_isNoop() {
+    CSVFormat csvFormat = csvFormat().withQuoteMode(QuoteMode.MINIMAL);
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "\"a,\",1,1.1", 
"b,2,2.2", "c,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a,", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenQuoteModeNonNumeric_isNoop() {
+    CSVFormat csvFormat = csvFormat().withQuoteMode(QuoteMode.NON_NUMERIC);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(headerLine(csvFormat), "\"a\",1,1.1", "\"b\",2,2.2", 
"\"c\",3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenQuoteModeNone_isNoop() {
+    CSVFormat csvFormat = 
csvFormat().withEscape('$').withQuoteMode(QuoteMode.NONE);
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", 
"c,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenCustomRecordSeparator_isNoop() {
+    CSVFormat csvFormat = csvFormat().withRecordSeparator("😆");
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), 
"a,1,1.1😆b,2,2.2😆c,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Collections.singletonList(
+                Arrays.asList("a", "1", "1.1😆b", "2", "2.2😆c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenSystemRecordSeparator_isNoop() {
+    CSVFormat csvFormat = csvFormat().withSystemRecordSeparator();
+    String systemRecordSeparator = csvFormat.getRecordSeparator();
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "a,1,1.1" + systemRecordSeparator + "b,2,2.2" + 
systemRecordSeparator + "c,3,3.3"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenTrailingDelimiter_skipsEndingDelimiter() {
+    CSVFormat csvFormat = csvFormat().withTrailingDelimiter(true);
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", 
"b,2,2.2,", "c,3,3.3,"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenNoTrailingDelimiter_includesEndingCell() {
+    CSVFormat csvFormat = csvFormat().withTrailingDelimiter(false);
+    PCollection<String> input =
+        pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", 
"b,2,2.2,", "c,3,3.3,"));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1", ""),
+                Arrays.asList("b", "2", "2.2", ""),
+                Arrays.asList("c", "3", "3.3", "")));
+    pipeline.run();
+  }
+
+  @Test
+  public void givenTrim_removesSpaces() {
+    CSVFormat csvFormat = csvFormat().withTrim(true);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "  a  ,1,1.1",
+                "b,        2     ,2.2",
+                "c,3,   3.3         "));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenNoTrim_keepsSpaces() {
+    CSVFormat csvFormat = csvFormat().withTrim(false);
+    PCollection<String> input =
+        pipeline.apply(
+            Create.of(
+                headerLine(csvFormat),
+                "  a  ,1,1.1",
+                "b,        2     ,2.2",
+                "c,3,   3.3         "));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat);
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("  a  ", "1", "1.1"),
+                Arrays.asList("b", "        2     ", "2.2"),
+                Arrays.asList("c", "3", "   3.3         ")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testSingleLineCsvRecord() {
+    String csvRecord = "a,1";
     PCollection<String> input = pipeline.apply(Create.of(csvRecord));
 
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat());
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", 
"1")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testMultiLineCsvRecord() {
+    String csvRecords =
+        "\"a\r\n1\",\"a\r\n2\"" + "\n" + "\"b\r\n1\",\"b\r\n2\"" + "\n" + 
"\"c\r\n1\",\"c\r\n2\"";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
     CsvIOStringToCsvRecord underTest =
-        new CsvIOStringToCsvRecord(csvFormat().withTrailingDelimiter());
+        new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n'));
     PAssert.that(input.apply(underTest))
-        .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "b", 
"c")));
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a\r\n1", "a\r\n2"),
+                Arrays.asList("b\r\n1", "b\r\n2"),
+                Arrays.asList("c\r\n1", "c\r\n2")));
 
     pipeline.run();
   }
 
   private static CSVFormat csvFormat() {
-    return CSVFormat.DEFAULT.withHeader("a_string", "an_integer", "a_double");
+    return 
CSVFormat.DEFAULT.withAllowDuplicateHeaderNames(false).withHeader(header);
   }
 }

Reply via email to