This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fc5a71db5ca [CsvIO]: Implement CsvIOParse::withCustomRecordParsing 
method (#32142)
fc5a71db5ca is described below

commit fc5a71db5caa95fd14988bfe475c240873216a2c
Author: Francis O'Hara <[email protected]>
AuthorDate: Fri Aug 9 23:12:22 2024 +0000

    [CsvIO]: Implement CsvIOParse::withCustomRecordParsing method (#32142)
    
    * completed implementation without tests
    
    Co-authored-by: Lahari Guduru <[email protected]>
    
    * intermediate stage
    
    Co-authored-by: Lahari Guduru <[email protected]>
    
    * Implement CsvIOParse.withCustomRecordParsing
    
    Co-authored-by: Lahari Guduru <[email protected]>
    
    ---------
    
    Co-authored-by: Lahari Guduru <[email protected]>
---
 .../org/apache/beam/sdk/io/csv/CsvIOParse.java     |  28 ++++-
 .../beam/sdk/io/csv/CsvIOParseConfiguration.java   |  12 +-
 .../org/apache/beam/sdk/io/csv/CsvIOParseTest.java | 127 ++++++++++++++++++++-
 3 files changed, 158 insertions(+), 9 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
index 0a27cdbc57e..5981e813276 100644
--- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.NonNull;
 
 /**
  * {@link PTransform} for Parsing CSV Record Strings into {@link 
Schema}-mapped target types. {@link
@@ -43,9 +44,30 @@ public abstract class CsvIOParse<T> extends 
PTransform<PCollection<String>, CsvI
     return new AutoValue_CsvIOParse.Builder<>();
   }
 
-  // TODO(https://github.com/apache/beam/issues/31875): Implement in future PR.
-  public CsvIOParse<T> withCustomRecordParsing(
-      Map<String, SerializableFunction<String, Object>> customProcessingMap) {
+  /**
+   * Configures custom cell parsing.
+   *
+   * <h2>Example</h2>
+   *
+   * <pre>{@code
+   * CsvIO.parse().withCustomRecordParsing("listOfInts", cell-> {
+   *
+   *  List<Integer> result = new ArrayList<>();
+   *  for (String stringValue: Splitter.on(";").split(cell)) {
+   *    result.add(Integer.parseInt(stringValue));
+   *  }
+   *
+   * });
+   * }</pre>
+   */
+  public <OutputT extends @NonNull Object> CsvIOParse<T> 
withCustomRecordParsing(
+      String fieldName, SerializableFunction<String, OutputT> 
customRecordParsingFn) {
+
+    Map<String, SerializableFunction<String, Object>> customProcessingMap =
+        getConfigBuilder().getOrCreateCustomProcessingMap();
+
+    customProcessingMap.put(fieldName, customRecordParsingFn::apply);
+    getConfigBuilder().setCustomProcessingMap(customProcessingMap);
     return this;
   }
 
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
index dd9ef5b3486..2be871a9dc2 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java
@@ -60,18 +60,26 @@ abstract class CsvIOParseConfiguration<T> implements 
Serializable {
     abstract Builder<T> setCustomProcessingMap(
         Map<String, SerializableFunction<String, Object>> customProcessingMap);
 
+    abstract Optional<Map<String, SerializableFunction<String, Object>>> 
getCustomProcessingMap();
+
+    final Map<String, SerializableFunction<String, Object>> 
getOrCreateCustomProcessingMap() {
+      if (!getCustomProcessingMap().isPresent()) {
+        setCustomProcessingMap(new HashMap<>());
+      }
+      return getCustomProcessingMap().get();
+    }
+
     abstract Builder<T> setCoder(Coder<T> coder);
 
     abstract Builder<T> setFromRowFn(SerializableFunction<Row, T> fromRowFn);
 
-    abstract Optional<Map<String, SerializableFunction<String, Object>>> 
getCustomProcessingMap();
-
     abstract CsvIOParseConfiguration<T> autoBuild();
 
     final CsvIOParseConfiguration<T> build() {
       if (!getCustomProcessingMap().isPresent()) {
         setCustomProcessingMap(new HashMap<>());
       }
+
       return autoBuild();
     }
   }
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
index 05d6982004f..a517cef3d51 100644
--- 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java
@@ -19,10 +19,17 @@ package org.apache.beam.sdk.io.csv;
 
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_TYPE_DESCRIPTOR;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesFromRowFn;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
+import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -38,17 +45,22 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
 import org.apache.commons.csv.CSVFormat;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+/** Tests for {@link CsvIOParse}. */
 @RunWith(JUnit4.class)
 public class CsvIOParseTest {
 
@@ -61,6 +73,12 @@ public class CsvIOParseTest {
               NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR,
               nullableAllPrimitiveDataTypesToRowFn(),
               nullableAllPrimitiveDataTypesFromRowFn());
+  private static final Coder<TimeContaining> TIME_CONTAINING_CODER =
+      SchemaCoder.of(
+          TIME_CONTAINING_SCHEMA,
+          TIME_CONTAINING_TYPE_DESCRIPTOR,
+          timeContainingToRowFn(),
+          timeContainingFromRowFn());
   private static final SerializableFunction<Row, Row> 
ROW_ROW_SERIALIZABLE_FUNCTION = row -> row;
   @Rule public final TestPipeline pipeline = TestPipeline.create();
 
@@ -120,7 +138,7 @@ public class CsvIOParseTest {
             underTest(
                 NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA,
                 csvFormat(),
-                emptyCustomProcessingMap(),
+                new HashMap<>(),
                 ROW_ROW_SERIALIZABLE_FUNCTION,
                 RowCoder.of(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA)));
     PAssert.that(result.getOutput()).containsInAnyOrder(want);
@@ -152,7 +170,7 @@ public class CsvIOParseTest {
             underTest(
                 NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA,
                 csvFormat(),
-                emptyCustomProcessingMap(),
+                new HashMap<>(),
                 nullableAllPrimitiveDataTypesFromRowFn(),
                 NULLABLE_ALL_PRIMITIVE_DATA_TYPES_CODER));
     PAssert.that(result.getOutput()).containsInAnyOrder(want);
@@ -161,6 +179,98 @@ public class CsvIOParseTest {
     pipeline.run();
   }
 
+  @Test
+  public void givenSingleCustomParsingLambda_parsesPOJOs() {
+    PCollection<String> records =
+        csvRecords(
+            pipeline,
+            "instant,instantList",
+            
"2024-01-23T10:00:05.000Z,10-00-05-2024-01-23;12-59-59-2024-01-24");
+    TimeContaining want =
+        timeContaining(
+            Instant.parse("2024-01-23T10:00:05.000Z"),
+            Arrays.asList(
+                Instant.parse("2024-01-23T10:00:05.000Z"),
+                Instant.parse("2024-01-24T12:59:59.000Z")));
+
+    CsvIOParse<TimeContaining> underTest =
+        underTest(
+                TIME_CONTAINING_SCHEMA,
+                CSVFormat.DEFAULT
+                    .withHeader("instant", "instantList")
+                    .withAllowDuplicateHeaderNames(false),
+                new HashMap<>(),
+                timeContainingFromRowFn(),
+                TIME_CONTAINING_CODER)
+            .withCustomRecordParsing("instantList", 
instantListParsingLambda());
+
+    CsvIOParseResult<TimeContaining> result = records.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenMultipleCustomParsingLambdas_parsesPOJOs() {
+    PCollection<String> records =
+        csvRecords(
+            pipeline,
+            "instant,instantList",
+            "2024-01-23@10:00:05,10-00-05-2024-01-23;12-59-59-2024-01-24");
+    TimeContaining want =
+        timeContaining(
+            Instant.parse("2024-01-23T10:00:05.000Z"),
+            Arrays.asList(
+                Instant.parse("2024-01-23T10:00:05.000Z"),
+                Instant.parse("2024-01-24T12:59:59.000Z")));
+
+    CsvIOParse<TimeContaining> underTest =
+        underTest(
+                TIME_CONTAINING_SCHEMA,
+                CSVFormat.DEFAULT
+                    .withHeader("instant", "instantList")
+                    .withAllowDuplicateHeaderNames(false),
+                new HashMap<>(),
+                timeContainingFromRowFn(),
+                TIME_CONTAINING_CODER)
+            .withCustomRecordParsing(
+                "instant",
+                input ->
+                    DateTimeFormat.forPattern("yyyy-MM-dd@HH:mm:ss")
+                        .parseDateTime(input)
+                        .toInstant())
+            .withCustomRecordParsing("instantList", 
instantListParsingLambda());
+
+    CsvIOParseResult<TimeContaining> result = records.apply(underTest);
+    PAssert.that(result.getOutput()).containsInAnyOrder(want);
+    PAssert.that(result.getErrors()).empty();
+
+    pipeline.run();
+  }
+
+  @Test
+  public void givenCustomParsingError_emits() {
+    PCollection<String> records =
+        csvRecords(pipeline, "instant,instantList", 
"2024-01-23T10:00:05.000Z,BAD CELL");
+    CsvIOParse<TimeContaining> underTest =
+        underTest(
+                TIME_CONTAINING_SCHEMA,
+                CSVFormat.DEFAULT
+                    .withHeader("instant", "instantList")
+                    .withAllowDuplicateHeaderNames(false),
+                new HashMap<>(),
+                timeContainingFromRowFn(),
+                TIME_CONTAINING_CODER)
+            .withCustomRecordParsing("instantList", 
instantListParsingLambda());
+
+    CsvIOParseResult<TimeContaining> result = records.apply(underTest);
+    PAssert.that(result.getOutput()).empty();
+    
PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L);
+
+    pipeline.run();
+  }
+
   private static CSVFormat csvFormat() {
     return CSVFormat.DEFAULT
         .withAllowDuplicateHeaderNames(false)
@@ -191,7 +301,16 @@ public class CsvIOParseTest {
     return CsvIOParse.<T>builder().setConfigBuilder(configBuilder).build();
   }
 
-  private static Map<String, SerializableFunction<String, Object>> 
emptyCustomProcessingMap() {
-    return new HashMap<>();
+  private static SerializableFunction<String, List<Instant>> 
instantListParsingLambda() {
+    return input -> {
+      Iterable<String> cells = Splitter.on(';').split(input);
+      ;
+      List<Instant> output = new ArrayList<>();
+      for (String cell : cells) {
+        output.add(
+            
DateTimeFormat.forPattern("HH-mm-ss-yyyy-MM-dd").parseDateTime(cell).toInstant());
+      }
+      return output;
+    };
   }
 }

Reply via email to