This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e646c28d2ac [CsvIO] Implemented CsvIOParseHelpers:parseCell (#31802)
e646c28d2ac is described below
commit e646c28d2ac82535221e135d503b3c3becb6eace
Author: lahariguduru <[email protected]>
AuthorDate: Thu Jul 11 16:27:23 2024 +0000
[CsvIO] Implemented CsvIOParseHelpers:parseCell (#31802)
* Created CsvIOHelpers method
* Created CsvIOHelpers:parseCell method
* deleted ExamplePojo class, created CsvIOParseHelpers::parseCell method
* Changed IllegalArgumentException to UnsupportedOperationException in
parseCell() method
---------
Co-authored-by: Lahari Guduru <[email protected]>
---
.../apache/beam/sdk/io/csv/CsvIOParseHelpers.java | 36 +-
.../beam/sdk/io/csv/CsvIOParseHelpersTest.java | 373 +++++++++++++++++++++
2 files changed, 407 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
index 042e284cd52..df99807cfea 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.csv;
+import java.math.BigDecimal;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
@@ -48,8 +50,38 @@ final class CsvIOParseHelpers {
* Parse the given {@link String} cell of the CSV record based on the given
field's {@link
* Schema.FieldType}.
*/
- // TODO(https://github.com/apache/beam/issues/31719): implement method.
static Object parseCell(String cell, Schema.Field field) {
- return "";
+ Schema.FieldType fieldType = field.getType();
+ try {
+ switch (fieldType.getTypeName()) {
+ case STRING:
+ return cell;
+ case INT16:
+ return Short.parseShort(cell);
+ case INT32:
+ return Integer.parseInt(cell);
+ case INT64:
+ return Long.parseLong(cell);
+ case BOOLEAN:
+ return Boolean.parseBoolean(cell);
+ case BYTE:
+ return Byte.parseByte(cell);
+ case DECIMAL:
+ return new BigDecimal(cell);
+ case DOUBLE:
+ return Double.parseDouble(cell);
+ case FLOAT:
+ return Float.parseFloat(cell);
+ case DATETIME:
+ return Instant.parse(cell);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported type: " + fieldType + ", consider using
withCustomRecordParsing");
+ }
+
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ e.getMessage() + " field " + field.getName() + " was received --
type mismatch");
+ }
}
}
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
new file mode 100644
index 00000000000..d6129055ae3
--- /dev/null
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.math.BigDecimal;
+import java.time.DateTimeException;
+import java.time.Instant;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CsvIOParseHelpers}. */
+@RunWith(JUnit4.class)
+public class CsvIOParseHelpersTest {
+
+ @Test
+ public void ignoresCaseFormat() {
+ String allCapsBool = "TRUE";
+ Schema schema = Schema.builder().addBooleanField("a_boolean").build();
+ assertEquals(true, CsvIOParseHelpers.parseCell(allCapsBool,
schema.getField("a_boolean")));
+ }
+
+ @Test
+ public void givenIntegerWithSurroundingSpaces_throws() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 12 ", 12);
+ Schema schema =
Schema.builder().addInt32Field("an_integer").addStringField("a_string").build();
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("an_integer")));
+ assertEquals(
+ "For input string: \""
+ + cellToExpectedValue.getKey()
+ + "\" field "
+ + schema.getField("an_integer").getName()
+ + " was received -- type mismatch",
+ e.getMessage());
+ }
+
+ @Test
+ public void givenDoubleWithSurroundingSpaces_parses() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 20.04 ",
20.04);
+ Schema schema =
Schema.builder().addDoubleField("a_double").addInt32Field("an_integer").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_double")));
+ }
+
+ @Test
+ public void givenStringWithSurroundingSpaces_parsesIncorrectly() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" a ", "a");
+ Schema schema =
Schema.builder().addStringField("a_string").addInt64Field("a_long").build();
+ assertEquals(
+ cellToExpectedValue.getKey(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_string")));
+ }
+
+ @Test
+ public void givenBigDecimalWithSurroundingSpaces_throws() {
+ BigDecimal decimal = new BigDecimal("123.456");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 123.456 ",
decimal);
+ Schema schema =
+
Schema.builder().addDecimalField("a_decimal").addStringField("a_string").build();
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_decimal")));
+ }
+
+ @Test
+ public void givenShortWithSurroundingSpaces_throws() {
+ Short shortNum = Short.parseShort("12");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 12 ",
shortNum);
+ Schema schema =
+ Schema.builder()
+ .addInt16Field("a_short")
+ .addInt32Field("an_integer")
+ .addInt64Field("a_long")
+ .build();
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_short")));
+ assertEquals(
+ "For input string: \""
+ + cellToExpectedValue.getKey()
+ + "\" field "
+ + schema.getField("a_short").getName()
+ + " was received -- type mismatch",
+ e.getMessage());
+ }
+
+ @Test
+ public void givenLongWithSurroundingSpaces_throws() {
+ Long longNum = Long.parseLong("3400000000");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 12 ",
longNum);
+ Schema schema =
+ Schema.builder()
+ .addInt16Field("a_short")
+ .addInt32Field("an_integer")
+ .addInt64Field("a_long")
+ .build();
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_long")));
+ assertEquals(
+ "For input string: \""
+ + cellToExpectedValue.getKey()
+ + "\" field "
+ + schema.getField("a_long").getName()
+ + " was received -- type mismatch",
+ e.getMessage());
+ }
+
+ @Test
+ public void givenFloatWithSurroundingSpaces_parses() {
+ Float floatNum = Float.parseFloat("3.141592");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 3.141592
", floatNum);
+ Schema schema =
+ Schema.builder()
+ .addFloatField("a_float")
+ .addInt32Field("an_integer")
+ .addStringField("a_string")
+ .build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_float")));
+ }
+
+ @Test
+ public void givenDatetimeWithSurroundingSpaces() throws DateTimeException {
+ Instant datetime = Instant.parse("1234-01-23T10:00:05.000Z");
+ DefaultMapEntry cellToExpectedValue =
+ new DefaultMapEntry(" 1234-01-23T10:00:05.000Z ", datetime);
+ Schema schema =
+
Schema.builder().addDateTimeField("a_datetime").addStringField("a_string").build();
+ DateTimeException e =
+ assertThrows(
+ DateTimeException.class,
+ () ->
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_datetime")));
+ assertEquals(
+ "Text " + "' 1234-01-23T10:00:05.000Z '" + " could not be parsed
at index 0",
+ e.getMessage());
+ }
+
+ @Test
+ public void givenByteWithSurroundingSpaces_throws() {
+ Byte byteNum = Byte.parseByte("40");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" 40 ",
byteNum);
+ Schema schema =
Schema.builder().addByteField("a_byte").addInt32Field("an_integer").build();
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_byte")));
+ assertEquals(
+ "For input string: \""
+ + cellToExpectedValue.getKey()
+ + "\" field "
+ + schema.getField("a_byte").getName()
+ + " was received -- type mismatch",
+ e.getMessage());
+ }
+
+ @Test
+ public void givenBooleanWithSurroundingSpaces_returnsInverse() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry(" true ",
true);
+ Schema schema =
+ Schema.builder()
+ .addBooleanField("a_boolean")
+ .addInt32Field("an_integer")
+ .addStringField("a_string")
+ .build();
+ assertEquals(
+ false,
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_boolean")));
+ }
+
+ @Test
+ public void givenMultiLineCell_parses() {
+ String multiLineString = "a\na\na\na\na\na\na\na\na\nand";
+ Schema schema =
Schema.builder().addStringField("a_string").addDoubleField("a_double").build();
+ assertEquals(
+ multiLineString, CsvIOParseHelpers.parseCell(multiLineString,
schema.getField("a_string")));
+ }
+
+ @Test
+ public void givenValidIntegerCell_parses() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("12", 12);
+ Schema schema =
Schema.builder().addInt32Field("an_integer").addInt64Field("a_long").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("an_integer")));
+ }
+
+ @Test
+ public void givenValidDoubleCell_parses() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("10.05", 10.05);
+ Schema schema =
Schema.builder().addDoubleField("a_double").addStringField("a_string").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_double")));
+ }
+
+ @Test
+ public void givenValidStringCell_parses() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("lithium",
"lithium");
+ Schema schema =
+
Schema.builder().addStringField("a_string").addDateTimeField("a_datetime").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_string")));
+ }
+
+ @Test
+ public void givenValidDecimalCell_parses() {
+ BigDecimal decimal = new BigDecimal("127.99");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("127.99",
decimal);
+ Schema schema =
+
Schema.builder().addDecimalField("a_decimal").addDoubleField("a_double").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_decimal")));
+ }
+
+ @Test
+ public void givenValidShortCell_parses() {
+ Short shortNum = Short.parseShort("36");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("36", shortNum);
+ Schema schema =
+ Schema.builder()
+ .addInt32Field("an_integer")
+ .addInt64Field("a_long")
+ .addInt16Field("a_short")
+ .build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_short")));
+ }
+
+ @Test
+ public void givenValidLongCell_parses() {
+ Long longNum = Long.parseLong("1234567890");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("1234567890",
longNum);
+ Schema schema =
+ Schema.builder()
+ .addInt32Field("an_integer")
+ .addInt64Field("a_long")
+ .addInt16Field("a_short")
+ .build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_long")));
+ }
+
+ @Test
+ public void givenValidFloatCell_parses() {
+ Float floatNum = Float.parseFloat("3.141592");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("3.141592",
floatNum);
+ Schema schema =
Schema.builder().addFloatField("a_float").addDoubleField("a_double").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_float")));
+ }
+
+ @Test
+ public void givenValidDateTimeCell_parses() {
+ Instant datetime = Instant.parse("2020-01-01T00:00:00.000Z");
+ DefaultMapEntry cellToExpectedValue = new
DefaultMapEntry("2020-01-01T00:00:00.000Z", datetime);
+ Schema schema =
+
Schema.builder().addDateTimeField("a_datetime").addStringField("a_string").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_datetime")));
+ }
+
+ @Test
+ public void givenValidByteCell_parses() {
+ Byte byteNum = Byte.parseByte("4");
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("4", byteNum);
+ Schema schema =
Schema.builder().addByteField("a_byte").addInt32Field("an_integer").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_byte")));
+ }
+
+ @Test
+ public void givenValidBooleanCell_parses() {
+ DefaultMapEntry cellToExpectedValue = new DefaultMapEntry("false", false);
+ Schema schema =
+
Schema.builder().addBooleanField("a_boolean").addStringField("a_string").build();
+ assertEquals(
+ cellToExpectedValue.getValue(),
+ CsvIOParseHelpers.parseCell(
+ cellToExpectedValue.getKey().toString(),
schema.getField("a_boolean")));
+ }
+
+ @Test
+ public void givenCellSchemaFieldMismatch_throws() {
+ String boolTrue = "true";
+ Schema schema =
Schema.builder().addBooleanField("a_boolean").addFloatField("a_float").build();
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> CsvIOParseHelpers.parseCell(boolTrue,
schema.getField("a_float")));
+ assertEquals(
+ "For input string: \"" + boolTrue + "\" field a_float was received --
type mismatch",
+ e.getMessage());
+ }
+
+ @Test
+ public void givenCellUnsupportedType_throws() {
+ String counting = "[one,two,three]";
+ Schema schema =
+ Schema.builder()
+ .addField("an_array",
Schema.FieldType.array(Schema.FieldType.STRING))
+ .addStringField("a_string")
+ .build();
+ UnsupportedOperationException e =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> CsvIOParseHelpers.parseCell(counting,
schema.getField("an_array")));
+ assertEquals(
+ "Unsupported type: "
+ + schema.getField("an_array").getType()
+ + ", consider using withCustomRecordParsing",
+ e.getMessage());
+ }
+}