This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 408f67cfc83 Create CsvIOParseHelpers::mapFieldPositions method (#31889)
408f67cfc83 is described below
commit 408f67cfc83e14bbf688522f0239ac3f20bc6a0d
Author: lahariguduru <[email protected]>
AuthorDate: Mon Jul 15 21:18:21 2024 +0000
Create CsvIOParseHelpers::mapFieldPositions method (#31889)
Co-authored-by: Lahari Guduru <[email protected]>
---
.../apache/beam/sdk/io/csv/CsvIOParseHelpers.java | 34 ++++++++-
.../beam/sdk/io/csv/CsvIOParseHelpersTest.java | 88 ++++++++++++++++++++++
2 files changed, 118 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
index 5273b43bfe0..856ccf42d84 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
@@ -22,9 +22,10 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import java.math.BigDecimal;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.commons.csv.CSVFormat;
@@ -86,9 +87,34 @@ final class CsvIOParseHelpers {
* Build a {@link List} of {@link Schema.Field}s corresponding to the
expected position of each
* field within the CSV record.
*/
- // TODO(https://github.com/apache/beam/issues/31718): implement method.
- static List<Schema.Field> mapFieldPositions(CSVFormat format, Schema schema)
{
- return new ArrayList<>();
+ static Map<Integer, Schema.Field> mapFieldPositions(CSVFormat format, Schema
schema) {
+ List<String> header = Arrays.asList(format.getHeader());
+ Map<Integer, Schema.Field> indexToFieldMap = new HashMap<>();
+ for (Schema.Field field : schema.getFields()) {
+ int index = getIndex(header, field);
+ if (index >= 0) {
+ indexToFieldMap.put(index, field);
+ }
+ }
+ return indexToFieldMap;
+ }
+
+ /**
+ * Attains expected index from {@link CSVFormat's} header matching a given
{@link Schema.Field}.
+ */
+ private static int getIndex(List<String> header, Schema.Field field) {
+ String fieldName = field.getName();
+ boolean presentInHeader = header.contains(fieldName);
+ boolean isNullable = field.getType().getNullable();
+ if (presentInHeader) {
+ return header.indexOf(fieldName);
+ }
+ if (isNullable) {
+ return -1;
+ }
+
+ throw new IllegalArgumentException(
+ String.format("header does not contain required %s field: %s",
Schema.class, fieldName));
}
/**
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
index f4ba855dfc2..5a387652022 100644
---
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertThrows;
import java.math.BigDecimal;
import java.time.DateTimeException;
import java.time.Instant;
+import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.apache.commons.csv.CSVFormat;
import org.junit.Test;
@@ -154,6 +156,92 @@ public class CsvIOParseHelpersTest {
/** End of tests for {@link
CsvIOParseHelpers#validateCsvFormatWithSchema(CSVFormat, Schema)}. */
//////////////////////////////////////////////////////////////////////////////////////////////
+ /** Tests for {@link CsvIOParseHelpers#mapFieldPositions(CSVFormat,
Schema)}. */
+ @Test
+ public void testHeaderWithComments() {
+ String[] comments = {"first line", "second line", "third line"};
+ Schema schema =
+
Schema.builder().addStringField("a_string").addStringField("another_string").build();
+ ImmutableMap<Integer, Schema.Field> want =
+ ImmutableMap.of(0, schema.getField("a_string"), 1,
schema.getField("another_string"));
+ Map<Integer, Schema.Field> got =
+ CsvIOParseHelpers.mapFieldPositions(
+ csvFormat()
+ .withHeader("a_string", "another_string")
+ .withHeaderComments((Object) comments),
+ schema);
+ assertEquals(want, got);
+ }
+
+ @Test
+ public void givenMatchingHeaderAndSchemaField_mapsPositions() {
+ Schema schema =
+ Schema.builder()
+ .addStringField("a_string")
+ .addDoubleField("a_double")
+ .addInt32Field("an_integer")
+ .build();
+ ImmutableMap<Integer, Schema.Field> want =
+ ImmutableMap.of(
+ 0,
+ schema.getField("a_string"),
+ 1,
+ schema.getField("an_integer"),
+ 2,
+ schema.getField("a_double"));
+ Map<Integer, Schema.Field> got =
+ CsvIOParseHelpers.mapFieldPositions(
+ csvFormat().withHeader("a_string", "an_integer", "a_double"),
schema);
+ assertEquals(want, got);
+ }
+
+ @Test
+ public void givenSchemaContainsNullableFieldTypes() {
+ Schema schema =
+ Schema.builder()
+ .addNullableStringField("a_string")
+ .addDoubleField("a_double")
+ .addInt32Field("an_integer")
+ .addDateTimeField("a_datetime")
+ .addNullableStringField("another_string")
+ .build();
+ ImmutableMap<Integer, Schema.Field> want =
+ ImmutableMap.of(
+ 0,
+ schema.getField("an_integer"),
+ 1,
+ schema.getField("a_double"),
+ 2,
+ schema.getField("a_datetime"));
+ Map<Integer, Schema.Field> got =
+ CsvIOParseHelpers.mapFieldPositions(
+ csvFormat().withHeader("an_integer", "a_double", "a_datetime"),
schema);
+ assertEquals(want, got);
+ }
+
+ @Test
+ public void givenNonNullableHeaderAndSchemaFieldMismatch_throws() {
+ Schema schema =
+ Schema.builder()
+ .addStringField("another_string")
+ .addInt32Field("an_integer")
+ .addStringField("a_string")
+ .build();
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ CsvIOParseHelpers.mapFieldPositions(
+ csvFormat().withHeader("an_integer", "a_string"), schema));
+ assertEquals(
+ "header does not contain required class
org.apache.beam.sdk.schemas.Schema field: "
+ + schema.getField("another_string").getName(),
+ e.getMessage());
+ }
+
+ /** End of tests for {@link CsvIOParseHelpers#mapFieldPositions(CSVFormat,
Schema)} */
+
+
////////////////////////////////////////////////////////////////////////////////////////////
/** Tests for {@link CsvIOParseHelpers#parseCell(String, Schema.Field)}. */
@Test