This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ea761115998 [CsvIO] Create CsvIOStringToCsvRecord Class (#31857)
ea761115998 is described below

commit ea7611159989c64e2190604f5c50e9457c4ca2d8
Author: lahariguduru <[email protected]>
AuthorDate: Mon Jul 15 21:50:53 2024 +0000

    [CsvIO] Create CsvIOStringToCsvRecord Class (#31857)
    
    * Create CsvIOStringToCsvRecord class
    
    * Create CsvIOStringToCsvRecord Class
    
    * Create CsvIOStringToCsvRecord Class
    
    * Create CsvIOStringToCsvRecord Class
    
    * Fixed BadRecord Output
    
    * Make class final
    
    ---------
    
    Co-authored-by: Lahari Guduru <[email protected]>
---
 .../beam/sdk/io/csv/CsvIOStringToCsvRecord.java    |  61 +++++++++
 .../sdk/io/csv/CsvIOStringToCsvRecordTest.java     | 143 +++++++++++++++++++++
 2 files changed, 204 insertions(+)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
new file mode 100644
index 00000000000..995052bf7f7
--- /dev/null
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * {@link CsvIOStringToCsvRecord} is a class that takes a {@link 
PCollection<String>} input and
+ * outputs a {@link PCollection<CSVRecord>} with potential {@link 
PCollection<CsvIOParseError>} for
+ * targeted error detection.
+ */
+final class CsvIOStringToCsvRecord
+    extends PTransform<PCollection<String>, PCollection<Iterable<String>>> {
+  private final CSVFormat csvFormat;
+
+  CsvIOStringToCsvRecord(CSVFormat csvFormat) {
+    this.csvFormat = csvFormat;
+  }
+
+  /**
+   * Creates {@link PCollection<CSVRecord>} from {@link PCollection<String>} 
for future processing
+   * to Row or custom type.
+   */
+  @Override
+  public PCollection<Iterable<String>> expand(PCollection<String> input) {
+    return input.apply(ParDo.of(new ProcessLineToRecordFn()));
+  }
+
+  /** Processes each line in order to convert it to a {@link CSVRecord}. */
+  private class ProcessLineToRecordFn extends DoFn<String, Iterable<String>> {
+    @ProcessElement
+    public void process(@Element String line, OutputReceiver<Iterable<String>> 
receiver)
+        throws IOException {
+      for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
+        receiver.output(record);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
new file mode 100644
index 00000000000..44db791cbee
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Tests for {@link CsvIOStringToCsvRecord}. */
+public class CsvIOStringToCsvRecordTest {
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testSingleLineCsvRecord() {
+    String csvRecord = "a,1";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+
+    CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat());
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", 
"1")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testMultiLineCsvRecord() {
+    String csvRecords =
+        "\"a\r\n1\",\"a\r\n2\"" + "\n" + "\"b\r\n1\",\"b\r\n2\"" + "\n" + 
"\"c\r\n1\",\"c\r\n2\"";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+    CsvIOStringToCsvRecord underTest =
+        new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n'));
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a\r\n1", "a\r\n2"),
+                Arrays.asList("b\r\n1", "b\r\n2"),
+                Arrays.asList("c\r\n1", "c\r\n2")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testCsvRecordsWithSkipHeaderRecord() {
+    String csvRecords = "a_string,an_integer\na,1\nb,2\n";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+    CsvIOStringToCsvRecord underTest =
+        new CsvIOStringToCsvRecord(csvFormat().withSkipHeaderRecord());
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(Arrays.asList(Arrays.asList("a", "1"), 
Arrays.asList("b", "2")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testCsvRecordsWithCommentMarker() {
+    String csvRecords = "#leaving a comment\n" + "a,1,1.1\nb,2,2.2\nc,3,3.3";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+    CsvIOStringToCsvRecord underTest =
+        new CsvIOStringToCsvRecord(csvFormat().withCommentMarker('#'));
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Arrays.asList("a", "1", "1.1"),
+                Arrays.asList("b", "2", "2.2"),
+                Arrays.asList("c", "3", "3.3")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testCsvRecordsWithIgnoreEmptyLines() {
+    String csvRecords = "line1\nline2\nline3\nline4\nline5\n\n\nline6";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+    CsvIOStringToCsvRecord underTest =
+        new CsvIOStringToCsvRecord(csvFormat().withIgnoreEmptyLines());
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(
+            Arrays.asList(
+                Collections.singletonList("line1"),
+                Collections.singletonList("line2"),
+                Collections.singletonList("line3"),
+                Collections.singletonList("line4"),
+                Collections.singletonList("line5"),
+                Collections.singletonList("line6")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testCsvRecordWithIgnoreSurroundingSpaces() {
+    String csvRecord = "    Seattle     ,   WA   ";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+
+    CsvIOStringToCsvRecord underTest =
+        new CsvIOStringToCsvRecord(csvFormat().withIgnoreSurroundingSpaces());
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(Collections.singletonList(Arrays.asList("Seattle", 
"WA")));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testCsvRecordWithTrailingDelimiter() {
+    String csvRecord = "a,b,c,";
+    PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+
+    CsvIOStringToCsvRecord underTest =
+        new CsvIOStringToCsvRecord(csvFormat().withTrailingDelimiter());
+    PAssert.that(input.apply(underTest))
+        .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "b", 
"c")));
+
+    pipeline.run();
+  }
+
+  private static CSVFormat csvFormat() {
+    return CSVFormat.DEFAULT.withHeader("a_string", "an_integer", "a_double");
+  }
+}

Reply via email to