This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e646c28d2ac [CsvIO] Implemented CsvIOParseHelpers:parseCell (#31802)
e646c28d2ac is described below

commit e646c28d2ac82535221e135d503b3c3becb6eace
Author: lahariguduru <[email protected]>
AuthorDate: Thu Jul 11 16:27:23 2024 +0000

    [CsvIO] Implemented CsvIOParseHelpers:parseCell (#31802)
    
    * Created CsvIOHelpers method
    
    * Created CsvIOHelpers:parseCell method
    
    * deleted ExamplePojo class, created CsvIOParseHelpers::parseCell method
    
    * Changed IllegalArgumentException to UnsupportedOperationException in 
parseCell() method
    
    ---------
    
    Co-authored-by: Lahari Guduru <[email protected]>
---
 .../apache/beam/sdk/io/csv/CsvIOParseHelpers.java  |  36 +-
 .../beam/sdk/io/csv/CsvIOParseHelpersTest.java     | 373 +++++++++++++++++++++
 2 files changed, 407 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
index 042e284cd52..df99807cfea 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.csv;
 
+import java.math.BigDecimal;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
@@ -48,8 +50,38 @@ final class CsvIOParseHelpers {
    * Parse the given {@link String} cell of the CSV record based on the given 
field's {@link
    * Schema.FieldType}.
    */
-  // TODO(https://github.com/apache/beam/issues/31719): implement method.
   static Object parseCell(String cell, Schema.Field field) {
-    return "";
+    Schema.FieldType fieldType = field.getType();
+    try {
+      switch (fieldType.getTypeName()) {
+        case STRING:
+          return cell;
+        case INT16:
+          return Short.parseShort(cell);
+        case INT32:
+          return Integer.parseInt(cell);
+        case INT64:
+          return Long.parseLong(cell);
+        case BOOLEAN:
+          return Boolean.parseBoolean(cell);
+        case BYTE:
+          return Byte.parseByte(cell);
+        case DECIMAL:
+          return new BigDecimal(cell);
+        case DOUBLE:
+          return Double.parseDouble(cell);
+        case FLOAT:
+          return Float.parseFloat(cell);
+        case DATETIME:
+          return Instant.parse(cell);
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported type: " + fieldType + ", consider using 
withCustomRecordParsing");
+      }
+
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(
+          e.getMessage() + " field " + field.getName() + " was received -- 
type mismatch");
+    }
   }
 }
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
new file mode 100644
index 00000000000..d6129055ae3
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.math.BigDecimal;
+import java.time.DateTimeException;
+import java.time.Instant;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CsvIOParseHelpers}. */
+@RunWith(JUnit4.class)
+public class CsvIOParseHelpersTest {
+
+  @Test
+  public void ignoresCaseFormat() {
+    String allCapsBool = "TRUE";
+    Schema schema = Schema.builder().addBooleanField("a_boolean").build();
+    assertEquals(true, CsvIOParseHelpers.parseCell(allCapsBool, 
schema.getField("a_boolean")));
+  }
+
+  @Test
+  public void givenIntegerWithSurroundingSpaces_throws() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   12   ", 12);
+    Schema schema = 
Schema.builder().addInt32Field("an_integer").addStringField("a_string").build();
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvIOParseHelpers.parseCell(
+                    cellToExpectedValue.getKey().toString(), 
schema.getField("an_integer")));
+    assertEquals(
+        "For input string: \""
+            + cellToExpectedValue.getKey()
+            + "\" field "
+            + schema.getField("an_integer").getName()
+            + " was received -- type mismatch",
+        e.getMessage());
+  }
+
+  @Test
+  public void givenDoubleWithSurroundingSpaces_parses() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   20.04   ", 
20.04);
+    Schema schema = 
Schema.builder().addDoubleField("a_double").addInt32Field("an_integer").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_double")));
+  }
+
+  @Test
+  public void givenStringWithSurroundingSpaces_parsesIncorrectly() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   a  ", "a");
+    Schema schema = 
Schema.builder().addStringField("a_string").addInt64Field("a_long").build();
+    assertEquals(
+        cellToExpectedValue.getKey(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_string")));
+  }
+
+  @Test
+  public void givenBigDecimalWithSurroundingSpaces_throws() {
+    BigDecimal decimal = new BigDecimal("123.456");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   123.456  ", 
decimal);
+    Schema schema =
+        
Schema.builder().addDecimalField("a_decimal").addStringField("a_string").build();
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            CsvIOParseHelpers.parseCell(
+                cellToExpectedValue.getKey().toString(), 
schema.getField("a_decimal")));
+  }
+
+  @Test
+  public void givenShortWithSurroundingSpaces_throws() {
+    Short shortNum = Short.parseShort("12");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   12   ", 
shortNum);
+    Schema schema =
+        Schema.builder()
+            .addInt16Field("a_short")
+            .addInt32Field("an_integer")
+            .addInt64Field("a_long")
+            .build();
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvIOParseHelpers.parseCell(
+                    cellToExpectedValue.getKey().toString(), 
schema.getField("a_short")));
+    assertEquals(
+        "For input string: \""
+            + cellToExpectedValue.getKey()
+            + "\" field "
+            + schema.getField("a_short").getName()
+            + " was received -- type mismatch",
+        e.getMessage());
+  }
+
+  @Test
+  public void givenLongWithSurroundingSpaces_throws() {
+    Long longNum = Long.parseLong("3400000000");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   12   ", 
longNum);
+    Schema schema =
+        Schema.builder()
+            .addInt16Field("a_short")
+            .addInt32Field("an_integer")
+            .addInt64Field("a_long")
+            .build();
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvIOParseHelpers.parseCell(
+                    cellToExpectedValue.getKey().toString(), 
schema.getField("a_long")));
+    assertEquals(
+        "For input string: \""
+            + cellToExpectedValue.getKey()
+            + "\" field "
+            + schema.getField("a_long").getName()
+            + " was received -- type mismatch",
+        e.getMessage());
+  }
+
+  @Test
+  public void givenFloatWithSurroundingSpaces_parses() {
+    Float floatNum = Float.parseFloat("3.141592");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   3.141592   
", floatNum);
+    Schema schema =
+        Schema.builder()
+            .addFloatField("a_float")
+            .addInt32Field("an_integer")
+            .addStringField("a_string")
+            .build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_float")));
+  }
+
+  @Test
+  public void givenDatetimeWithSurroundingSpaces() throws DateTimeException {
+    Instant datetime = Instant.parse("1234-01-23T10:00:05.000Z");
+    DefaultMapEntry cellToExpectedValue =
+        new DefaultMapEntry("   1234-01-23T10:00:05.000Z   ", datetime);
+    Schema schema =
+        
Schema.builder().addDateTimeField("a_datetime").addStringField("a_string").build();
+    DateTimeException e =
+        assertThrows(
+            DateTimeException.class,
+            () ->
+                CsvIOParseHelpers.parseCell(
+                    cellToExpectedValue.getKey().toString(), 
schema.getField("a_datetime")));
+    assertEquals(
+        "Text " + "'   1234-01-23T10:00:05.000Z   '" + " could not be parsed 
at index 0",
+        e.getMessage());
+  }
+
+  @Test
+  public void givenByteWithSurroundingSpaces_throws() {
+    Byte byteNum = Byte.parseByte("40");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   40   ", 
byteNum);
+    Schema schema = 
Schema.builder().addByteField("a_byte").addInt32Field("an_integer").build();
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvIOParseHelpers.parseCell(
+                    cellToExpectedValue.getKey().toString(), 
schema.getField("a_byte")));
+    assertEquals(
+        "For input string: \""
+            + cellToExpectedValue.getKey()
+            + "\" field "
+            + schema.getField("a_byte").getName()
+            + " was received -- type mismatch",
+        e.getMessage());
+  }
+
+  @Test
+  public void givenBooleanWithSurroundingSpaces_returnsInverse() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("   true    ", 
true);
+    Schema schema =
+        Schema.builder()
+            .addBooleanField("a_boolean")
+            .addInt32Field("an_integer")
+            .addStringField("a_string")
+            .build();
+    assertEquals(
+        false,
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_boolean")));
+  }
+
+  @Test
+  public void givenMultiLineCell_parses() {
+    String multiLineString = "a\na\na\na\na\na\na\na\na\nand";
+    Schema schema = 
Schema.builder().addStringField("a_string").addDoubleField("a_double").build();
+    assertEquals(
+        multiLineString, CsvIOParseHelpers.parseCell(multiLineString, 
schema.getField("a_string")));
+  }
+
+  @Test
+  public void givenValidIntegerCell_parses() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("12", 12);
+    Schema schema = 
Schema.builder().addInt32Field("an_integer").addInt64Field("a_long").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("an_integer")));
+  }
+
+  @Test
+  public void givenValidDoubleCell_parses() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("10.05", 10.05);
+    Schema schema = 
Schema.builder().addDoubleField("a_double").addStringField("a_string").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_double")));
+  }
+
+  @Test
+  public void givenValidStringCell_parses() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("lithium", 
"lithium");
+    Schema schema =
+        
Schema.builder().addStringField("a_string").addDateTimeField("a_datetime").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_string")));
+  }
+
+  @Test
+  public void givenValidDecimalCell_parses() {
+    BigDecimal decimal = new BigDecimal("127.99");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("127.99", 
decimal);
+    Schema schema =
+        
Schema.builder().addDecimalField("a_decimal").addDoubleField("a_double").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_decimal")));
+  }
+
+  @Test
+  public void givenValidShortCell_parses() {
+    Short shortNum = Short.parseShort("36");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("36", shortNum);
+    Schema schema =
+        Schema.builder()
+            .addInt32Field("an_integer")
+            .addInt64Field("a_long")
+            .addInt16Field("a_short")
+            .build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_short")));
+  }
+
+  @Test
+  public void givenValidLongCell_parses() {
+    Long longNum = Long.parseLong("1234567890");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("1234567890", 
longNum);
+    Schema schema =
+        Schema.builder()
+            .addInt32Field("an_integer")
+            .addInt64Field("a_long")
+            .addInt16Field("a_short")
+            .build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_long")));
+  }
+
+  @Test
+  public void givenValidFloatCell_parses() {
+    Float floatNum = Float.parseFloat("3.141592");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("3.141592", 
floatNum);
+    Schema schema = 
Schema.builder().addFloatField("a_float").addDoubleField("a_double").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_float")));
+  }
+
+  @Test
+  public void givenValidDateTimeCell_parses() {
+    Instant datetime = Instant.parse("2020-01-01T00:00:00.000Z");
+    DefaultMapEntry cellToExpectedValue = new 
DefaultMapEntry("2020-01-01T00:00:00.000Z", datetime);
+    Schema schema =
+        
Schema.builder().addDateTimeField("a_datetime").addStringField("a_string").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_datetime")));
+  }
+
+  @Test
+  public void givenValidByteCell_parses() {
+    Byte byteNum = Byte.parseByte("4");
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("4", byteNum);
+    Schema schema = 
Schema.builder().addByteField("a_byte").addInt32Field("an_integer").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_byte")));
+  }
+
+  @Test
+  public void givenValidBooleanCell_parses() {
+    DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("false", false);
+    Schema schema =
+        
Schema.builder().addBooleanField("a_boolean").addStringField("a_string").build();
+    assertEquals(
+        cellToExpectedValue.getValue(),
+        CsvIOParseHelpers.parseCell(
+            cellToExpectedValue.getKey().toString(), 
schema.getField("a_boolean")));
+  }
+
+  @Test
+  public void givenCellSchemaFieldMismatch_throws() {
+    String boolTrue = "true";
+    Schema schema = 
Schema.builder().addBooleanField("a_boolean").addFloatField("a_float").build();
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> CsvIOParseHelpers.parseCell(boolTrue, 
schema.getField("a_float")));
+    assertEquals(
+        "For input string: \"" + boolTrue + "\" field a_float was received -- 
type mismatch",
+        e.getMessage());
+  }
+
+  @Test
+  public void givenCellUnsupportedType_throws() {
+    String counting = "[one,two,three]";
+    Schema schema =
+        Schema.builder()
+            .addField("an_array", 
Schema.FieldType.array(Schema.FieldType.STRING))
+            .addStringField("a_string")
+            .build();
+    UnsupportedOperationException e =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> CsvIOParseHelpers.parseCell(counting, 
schema.getField("an_array")));
+    assertEquals(
+        "Unsupported type: "
+            + schema.getField("an_array").getType()
+            + ", consider using withCustomRecordParsing",
+        e.getMessage());
+  }
+}

Reply via email to