This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 408f67cfc83 Create CsvIOParseHelpers::mapFieldPositions method (#31889)
408f67cfc83 is described below

commit 408f67cfc83e14bbf688522f0239ac3f20bc6a0d
Author: lahariguduru <[email protected]>
AuthorDate: Mon Jul 15 21:18:21 2024 +0000

    Create CsvIOParseHelpers::mapFieldPositions method (#31889)
    
    Co-authored-by: Lahari Guduru <[email protected]>
---
 .../apache/beam/sdk/io/csv/CsvIOParseHelpers.java  | 34 ++++++++-
 .../beam/sdk/io/csv/CsvIOParseHelpersTest.java     | 88 ++++++++++++++++++++++
 2 files changed, 118 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
index 5273b43bfe0..856ccf42d84 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
@@ -22,9 +22,10 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import java.math.BigDecimal;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.commons.csv.CSVFormat;
@@ -86,9 +87,34 @@ final class CsvIOParseHelpers {
    * Build a {@link List} of {@link Schema.Field}s corresponding to the 
expected position of each
    * field within the CSV record.
    */
-  // TODO(https://github.com/apache/beam/issues/31718): implement method.
-  static List<Schema.Field> mapFieldPositions(CSVFormat format, Schema schema) 
{
-    return new ArrayList<>();
+  static Map<Integer, Schema.Field> mapFieldPositions(CSVFormat format, Schema 
schema) {
+    List<String> header = Arrays.asList(format.getHeader());
+    Map<Integer, Schema.Field> indexToFieldMap = new HashMap<>();
+    for (Schema.Field field : schema.getFields()) {
+      int index = getIndex(header, field);
+      if (index >= 0) {
+        indexToFieldMap.put(index, field);
+      }
+    }
+    return indexToFieldMap;
+  }
+
+  /**
+   * Attains expected index from {@link CSVFormat's} header matching a given 
{@link Schema.Field}.
+   */
+  private static int getIndex(List<String> header, Schema.Field field) {
+    String fieldName = field.getName();
+    boolean presentInHeader = header.contains(fieldName);
+    boolean isNullable = field.getType().getNullable();
+    if (presentInHeader) {
+      return header.indexOf(fieldName);
+    }
+    if (isNullable) {
+      return -1;
+    }
+
+    throw new IllegalArgumentException(
+        String.format("header does not contain required %s field: %s", 
Schema.class, fieldName));
   }
 
   /**
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
index f4ba855dfc2..5a387652022 100644
--- 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertThrows;
 import java.math.BigDecimal;
 import java.time.DateTimeException;
 import java.time.Instant;
+import java.util.Map;
 import org.apache.beam.sdk.schemas.Schema;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
 import org.apache.commons.csv.CSVFormat;
 import org.junit.Test;
@@ -154,6 +156,92 @@ public class CsvIOParseHelpersTest {
 
   /** End of tests for {@link 
CsvIOParseHelpers#validateCsvFormatWithSchema(CSVFormat, Schema)}. */
   
//////////////////////////////////////////////////////////////////////////////////////////////
+  /** Tests for {@link CsvIOParseHelpers#mapFieldPositions(CSVFormat, 
Schema)}. */
+  @Test
+  public void testHeaderWithComments() {
+    String[] comments = {"first line", "second line", "third line"};
+    Schema schema =
+        
Schema.builder().addStringField("a_string").addStringField("another_string").build();
+    ImmutableMap<Integer, Schema.Field> want =
+        ImmutableMap.of(0, schema.getField("a_string"), 1, 
schema.getField("another_string"));
+    Map<Integer, Schema.Field> got =
+        CsvIOParseHelpers.mapFieldPositions(
+            csvFormat()
+                .withHeader("a_string", "another_string")
+                .withHeaderComments((Object) comments),
+            schema);
+    assertEquals(want, got);
+  }
+
+  @Test
+  public void givenMatchingHeaderAndSchemaField_mapsPositions() {
+    Schema schema =
+        Schema.builder()
+            .addStringField("a_string")
+            .addDoubleField("a_double")
+            .addInt32Field("an_integer")
+            .build();
+    ImmutableMap<Integer, Schema.Field> want =
+        ImmutableMap.of(
+            0,
+            schema.getField("a_string"),
+            1,
+            schema.getField("an_integer"),
+            2,
+            schema.getField("a_double"));
+    Map<Integer, Schema.Field> got =
+        CsvIOParseHelpers.mapFieldPositions(
+            csvFormat().withHeader("a_string", "an_integer", "a_double"), 
schema);
+    assertEquals(want, got);
+  }
+
+  @Test
+  public void givenSchemaContainsNullableFieldTypes() {
+    Schema schema =
+        Schema.builder()
+            .addNullableStringField("a_string")
+            .addDoubleField("a_double")
+            .addInt32Field("an_integer")
+            .addDateTimeField("a_datetime")
+            .addNullableStringField("another_string")
+            .build();
+    ImmutableMap<Integer, Schema.Field> want =
+        ImmutableMap.of(
+            0,
+            schema.getField("an_integer"),
+            1,
+            schema.getField("a_double"),
+            2,
+            schema.getField("a_datetime"));
+    Map<Integer, Schema.Field> got =
+        CsvIOParseHelpers.mapFieldPositions(
+            csvFormat().withHeader("an_integer", "a_double", "a_datetime"), 
schema);
+    assertEquals(want, got);
+  }
+
+  @Test
+  public void givenNonNullableHeaderAndSchemaFieldMismatch_throws() {
+    Schema schema =
+        Schema.builder()
+            .addStringField("another_string")
+            .addInt32Field("an_integer")
+            .addStringField("a_string")
+            .build();
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvIOParseHelpers.mapFieldPositions(
+                    csvFormat().withHeader("an_integer", "a_string"), schema));
+    assertEquals(
+        "header does not contain required class 
org.apache.beam.sdk.schemas.Schema field: "
+            + schema.getField("another_string").getName(),
+        e.getMessage());
+  }
+
+  /** End of tests for {@link CsvIOParseHelpers#mapFieldPositions(CSVFormat, 
Schema)} */
+
+  
////////////////////////////////////////////////////////////////////////////////////////////
 
   /** Tests for {@link CsvIOParseHelpers#parseCell(String, Schema.Field)}. */
   @Test

Reply via email to