This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fc5a71db5ca [CsvIO]: Implement CsvIOParse::withCustomRecordParsing
method (#32142)
fc5a71db5ca is described below
commit fc5a71db5caa95fd14988bfe475c240873216a2c
Author: Francis O'Hara <[email protected]>
AuthorDate: Fri Aug 9 23:12:22 2024 +0000
[CsvIO]: Implement CsvIOParse::withCustomRecordParsing method (#32142)
* completed implementation without tests
Co-authored-by: Lahari Guduru <[email protected]>
* intermediate stage
Co-authored-by: Lahari Guduru <[email protected]>
* Implement CsvIOParse.withCustomRecordParsing
Co-authored-by: Lahari Guduru <[email protected]>
---------
Co-authored-by: Lahari Guduru <[email protected]>
---
.../org/apache/beam/sdk/io/csv/CsvIOParse.java | 28 ++++-
.../beam/sdk/io/csv/CsvIOParseConfiguration.java | 12 +-
.../org/apache/beam/sdk/io/csv/CsvIOParseTest.java | 127 ++++++++++++++++++++-
3 files changed, 158 insertions(+), 9 deletions(-)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
index 0a27cdbc57e..5981e813276 100644
--- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.NonNull;
/**
* {@link PTransform} for Parsing CSV Record Strings into {@link
Schema}-mapped target types. {@link
@@ -43,9 +44,30 @@ public abstract class CsvIOParse<T> extends
PTransform<PCollection<String>, CsvI
return new AutoValue_CsvIOParse.Builder<>();
}
- // TODO(https://github.com/apache/beam/issues/31875): Implement in future PR.
- public CsvIOParse<T> withCustomRecordParsing(
- Map<String, SerializableFunction<String, Object>> customProcessingMap) {
+ /**
+ * Configures custom cell parsing.
+ *
+ * <h2>Example</h2>
+ *
+ * <pre>{@code
+ * CsvIO.parse().withCustomRecordParsing("listOfInts", cell-> {
+ *
+ * List<Integer> result = new ArrayList<>();
+ * for (String stringValue: Splitter.on(";").split(cell)) {
+ * result.add(Integer.parseInt(stringValue));
+ * }
+ *
+ * });
+ * }</pre>
+ */
+ public <OutputT extends @NonNull Object> CsvIOParse<T>
withCustomRecordParsing(
+ String fieldName, SerializableFunction<String, OutputT>
customRecordParsingFn) {
+
+ Map<String, SerializableFunction<String, Object>> customProcessingMap =
+ getConfigBuilder().getOrCreateCustomProcessingMap();
+
+ customProcessingMap.put(fieldName, customRecordParsingFn::apply);
+ getConfigBuilder().setCustomProcessingMap(customProcessingMap);
return this;
}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
index dd9ef5b3486..2be871a9dc2 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
@@ -60,18 +60,26 @@ abstract class CsvIOParseConfiguration<T> implements
Serializable {
abstract Builder<T> setCustomProcessingMap(
Map<String, SerializableFunction<String, Object>> customProcessingMap);
+ abstract Optional<Map<String, SerializableFunction<String, Object>>>
getCustomProcessingMap();
+
+ final Map<String, SerializableFunction<String, Object>>
getOrCreateCustomProcessingMap() {
+ if (!getCustomProcessingMap().isPresent()) {
+ setCustomProcessingMap(new HashMap<>());
+ }
+ return getCustomProcessingMap().get();
+ }
+
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setFromRowFn(SerializableFunction<Row, T> fromRowFn);
- abstract Optional<Map<String, SerializableFunction<String, Object>>>
getCustomProcessingMap();
-
abstract CsvIOParseConfiguration<T> autoBuild();
final CsvIOParseConfiguration<T> build() {
if (!getCustomProcessingMap().isPresent()) {
setCustomProcessingMap(new HashMap<>());
}
+
return autoBuild();
}
}
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
index 05d6982004f..a517cef3d51 100644
---
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
@@ -19,10 +19,17 @@ package org.apache.beam.sdk.io.csv;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR;
+import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_TYPE_DESCRIPTOR;
+import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesFromRowFn;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
+import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
+import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -38,17 +45,22 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.commons.csv.CSVFormat;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+/** Tests for {@link CsvIOParse}. */
@RunWith(JUnit4.class)
public class CsvIOParseTest {
@@ -61,6 +73,12 @@ public class CsvIOParseTest {
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR,
nullableAllPrimitiveDataTypesToRowFn(),
nullableAllPrimitiveDataTypesFromRowFn());
+ private static final Coder<TimeContaining> TIME_CONTAINING_CODER =
+ SchemaCoder.of(
+ TIME_CONTAINING_SCHEMA,
+ TIME_CONTAINING_TYPE_DESCRIPTOR,
+ timeContainingToRowFn(),
+ timeContainingFromRowFn());
private static final SerializableFunction<Row, Row>
ROW_ROW_SERIALIZABLE_FUNCTION = row -> row;
@Rule public final TestPipeline pipeline = TestPipeline.create();
@@ -120,7 +138,7 @@ public class CsvIOParseTest {
underTest(
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA,
csvFormat(),
- emptyCustomProcessingMap(),
+ new HashMap<>(),
ROW_ROW_SERIALIZABLE_FUNCTION,
RowCoder.of(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)));
PAssert.that(result.getOutput()).containsInAnyOrder(want);
@@ -152,7 +170,7 @@ public class CsvIOParseTest {
underTest(
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA,
csvFormat(),
- emptyCustomProcessingMap(),
+ new HashMap<>(),
nullableAllPrimitiveDataTypesFromRowFn(),
NULLABLE_ALL_PRIMITIVE_DATA_TYPES_CODER));
PAssert.that(result.getOutput()).containsInAnyOrder(want);
@@ -161,6 +179,98 @@ public class CsvIOParseTest {
pipeline.run();
}
+ @Test
+ public void givenSingleCustomParsingLambda_parsesPOJOs() {
+ PCollection<String> records =
+ csvRecords(
+ pipeline,
+ "instant,instantList",
+
"2024-01-23T10:00:05.000Z,10-00-05-2024-01-23;12-59-59-2024-01-24");
+ TimeContaining want =
+ timeContaining(
+ Instant.parse("2024-01-23T10:00:05.000Z"),
+ Arrays.asList(
+ Instant.parse("2024-01-23T10:00:05.000Z"),
+ Instant.parse("2024-01-24T12:59:59.000Z")));
+
+ CsvIOParse<TimeContaining> underTest =
+ underTest(
+ TIME_CONTAINING_SCHEMA,
+ CSVFormat.DEFAULT
+ .withHeader("instant", "instantList")
+ .withAllowDuplicateHeaderNames(false),
+ new HashMap<>(),
+ timeContainingFromRowFn(),
+ TIME_CONTAINING_CODER)
+ .withCustomRecordParsing("instantList",
instantListParsingLambda());
+
+ CsvIOParseResult<TimeContaining> result = records.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
+
+ pipeline.run();
+ }
+
+ @Test
+ public void givenMultipleCustomParsingLambdas_parsesPOJOs() {
+ PCollection<String> records =
+ csvRecords(
+ pipeline,
+ "instant,instantList",
+ "2024-01-23@10:00:05,10-00-05-2024-01-23;12-59-59-2024-01-24");
+ TimeContaining want =
+ timeContaining(
+ Instant.parse("2024-01-23T10:00:05.000Z"),
+ Arrays.asList(
+ Instant.parse("2024-01-23T10:00:05.000Z"),
+ Instant.parse("2024-01-24T12:59:59.000Z")));
+
+ CsvIOParse<TimeContaining> underTest =
+ underTest(
+ TIME_CONTAINING_SCHEMA,
+ CSVFormat.DEFAULT
+ .withHeader("instant", "instantList")
+ .withAllowDuplicateHeaderNames(false),
+ new HashMap<>(),
+ timeContainingFromRowFn(),
+ TIME_CONTAINING_CODER)
+ .withCustomRecordParsing(
+ "instant",
+ input ->
+ DateTimeFormat.forPattern("yyyy-MM-dd@HH:mm:ss")
+ .parseDateTime(input)
+ .toInstant())
+ .withCustomRecordParsing("instantList",
instantListParsingLambda());
+
+ CsvIOParseResult<TimeContaining> result = records.apply(underTest);
+ PAssert.that(result.getOutput()).containsInAnyOrder(want);
+ PAssert.that(result.getErrors()).empty();
+
+ pipeline.run();
+ }
+
+ @Test
+ public void givenCustomParsingError_emits() {
+ PCollection<String> records =
+ csvRecords(pipeline, "instant,instantList",
"2024-01-23T10:00:05.000Z,BAD CELL");
+ CsvIOParse<TimeContaining> underTest =
+ underTest(
+ TIME_CONTAINING_SCHEMA,
+ CSVFormat.DEFAULT
+ .withHeader("instant", "instantList")
+ .withAllowDuplicateHeaderNames(false),
+ new HashMap<>(),
+ timeContainingFromRowFn(),
+ TIME_CONTAINING_CODER)
+ .withCustomRecordParsing("instantList",
instantListParsingLambda());
+
+ CsvIOParseResult<TimeContaining> result = records.apply(underTest);
+ PAssert.that(result.getOutput()).empty();
+
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+
+ pipeline.run();
+ }
+
private static CSVFormat csvFormat() {
return CSVFormat.DEFAULT
.withAllowDuplicateHeaderNames(false)
@@ -191,7 +301,16 @@ public class CsvIOParseTest {
return CsvIOParse.<T>builder().setConfigBuilder(configBuilder).build();
}
- private static Map<String, SerializableFunction<String, Object>>
emptyCustomProcessingMap() {
- return new HashMap<>();
+ private static SerializableFunction<String, List<Instant>>
instantListParsingLambda() {
+ return input -> {
+ Iterable<String> cells = Splitter.on(';').split(input);
+ ;
+ List<Instant> output = new ArrayList<>();
+ for (String cell : cells) {
+ output.add(
+
DateTimeFormat.forPattern("HH-mm-ss-yyyy-MM-dd").parseDateTime(cell).toInstant());
+ }
+ return output;
+ };
}
}