This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ea761115998 [CsvIO] Create CsvIOStringToCsvRecord Class (#31857)
ea761115998 is described below
commit ea7611159989c64e2190604f5c50e9457c4ca2d8
Author: lahariguduru <[email protected]>
AuthorDate: Mon Jul 15 21:50:53 2024 +0000
[CsvIO] Create CsvIOStringToCsvRecord Class (#31857)
* Create CsvIOStringToCsvRecord class
* Create CsvIOStringToCsvRecord Class
* Create CsvIOStringToCsvRecord Class
* Create CsvIOStringToCsvRecord Class
* Fixed BadRecord Output
* Make class final
---------
Co-authored-by: Lahari Guduru <[email protected]>
---
.../beam/sdk/io/csv/CsvIOStringToCsvRecord.java | 61 +++++++++
.../sdk/io/csv/CsvIOStringToCsvRecordTest.java | 143 +++++++++++++++++++++
2 files changed, 204 insertions(+)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
new file mode 100644
index 00000000000..995052bf7f7
--- /dev/null
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * {@link CsvIOStringToCsvRecord} is a class that takes a {@link
PCollection<String>} input and
+ * outputs a {@link PCollection<CSVRecord>} with potential {@link
PCollection<CsvIOParseError>} for
+ * targeted error detection.
+ */
+final class CsvIOStringToCsvRecord
+ extends PTransform<PCollection<String>, PCollection<Iterable<String>>> {
+ private final CSVFormat csvFormat;
+
+ CsvIOStringToCsvRecord(CSVFormat csvFormat) {
+ this.csvFormat = csvFormat;
+ }
+
+ /**
+ * Creates {@link PCollection<CSVRecord>} from {@link PCollection<String>}
for future processing
+ * to Row or custom type.
+ */
+ @Override
+ public PCollection<Iterable<String>> expand(PCollection<String> input) {
+ return input.apply(ParDo.of(new ProcessLineToRecordFn()));
+ }
+
+ /** Processes each line in order to convert it to a {@link CSVRecord}. */
+ private class ProcessLineToRecordFn extends DoFn<String, Iterable<String>> {
+ @ProcessElement
+ public void process(@Element String line, OutputReceiver<Iterable<String>>
receiver)
+ throws IOException {
+ for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) {
+ receiver.output(record);
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
new file mode 100644
index 00000000000..44db791cbee
--- /dev/null
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Tests for {@link CsvIOStringToCsvRecord}. */
+public class CsvIOStringToCsvRecordTest {
+ @Rule public final TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testSingleLineCsvRecord() {
+ String csvRecord = "a,1";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+
+ CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat());
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(Collections.singletonList(Arrays.asList("a",
"1")));
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testMultiLineCsvRecord() {
+ String csvRecords =
+ "\"a\r\n1\",\"a\r\n2\"" + "\n" + "\"b\r\n1\",\"b\r\n2\"" + "\n" +
"\"c\r\n1\",\"c\r\n2\"";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+ CsvIOStringToCsvRecord underTest =
+ new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n'));
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(
+ Arrays.asList(
+ Arrays.asList("a\r\n1", "a\r\n2"),
+ Arrays.asList("b\r\n1", "b\r\n2"),
+ Arrays.asList("c\r\n1", "c\r\n2")));
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testCsvRecordsWithSkipHeaderRecord() {
+ String csvRecords = "a_string,an_integer\na,1\nb,2\n";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+ CsvIOStringToCsvRecord underTest =
+ new CsvIOStringToCsvRecord(csvFormat().withSkipHeaderRecord());
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(Arrays.asList(Arrays.asList("a", "1"),
Arrays.asList("b", "2")));
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testCsvRecordsWithCommentMarker() {
+ String csvRecords = "#leaving a comment\n" + "a,1,1.1\nb,2,2.2\nc,3,3.3";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+ CsvIOStringToCsvRecord underTest =
+ new CsvIOStringToCsvRecord(csvFormat().withCommentMarker('#'));
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(
+ Arrays.asList(
+ Arrays.asList("a", "1", "1.1"),
+ Arrays.asList("b", "2", "2.2"),
+ Arrays.asList("c", "3", "3.3")));
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testCsvRecordsWithIgnoreEmptyLines() {
+ String csvRecords = "line1\nline2\nline3\nline4\nline5\n\n\nline6";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecords));
+
+ CsvIOStringToCsvRecord underTest =
+ new CsvIOStringToCsvRecord(csvFormat().withIgnoreEmptyLines());
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(
+ Arrays.asList(
+ Collections.singletonList("line1"),
+ Collections.singletonList("line2"),
+ Collections.singletonList("line3"),
+ Collections.singletonList("line4"),
+ Collections.singletonList("line5"),
+ Collections.singletonList("line6")));
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testCsvRecordWithIgnoreSurroundingSpaces() {
+ String csvRecord = " Seattle , WA ";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+
+ CsvIOStringToCsvRecord underTest =
+ new CsvIOStringToCsvRecord(csvFormat().withIgnoreSurroundingSpaces());
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(Collections.singletonList(Arrays.asList("Seattle",
"WA")));
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testCsvRecordWithTrailingDelimiter() {
+ String csvRecord = "a,b,c,";
+ PCollection<String> input = pipeline.apply(Create.of(csvRecord));
+
+ CsvIOStringToCsvRecord underTest =
+ new CsvIOStringToCsvRecord(csvFormat().withTrailingDelimiter());
+ PAssert.that(input.apply(underTest))
+ .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "b",
"c")));
+
+ pipeline.run();
+ }
+
+ private static CSVFormat csvFormat() {
+ return CSVFormat.DEFAULT.withHeader("a_string", "an_integer", "a_double");
+ }
+}