This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b896cd5ab6d Improve Integer conversion in BigQueryIO's Storage Write
API method. (#24366)
b896cd5ab6d is described below
commit b896cd5ab6ddd993ae2819ce5f40f43bab707459
Author: Sergei Lilichenko <[email protected]>
AuthorDate: Tue Dec 13 08:54:09 2022 -0800
Improve Integer conversion in BigQueryIO's Storage Write API method.
(#24366)
* Improve Integer conversion in BigQueryIO's Storage Write API method.
* Improve Integer conversion in BigQueryIO's Storage Write API method.
Introduced a dedicated Exception. Improved error messages by capturing the
source failure and reducing message sizes.
* Improve Integer conversion in BigQueryIO's Storage Write API method.
Introduced a dedicated Exception; improved error messages by capturing the
source failure.
* Fixed a minor typo.
* Merged PR with new commits.
* Update
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
* Apply suggestions from code review
Apply spotless
* Update
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
Co-authored-by: slilichenko <Sergei Lilichenko>
Co-authored-by: Lukasz Cwik <[email protected]>
---
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 46 +++++++++-
.../bigquery/TableRowToStorageApiProtoTest.java | 99 ++++++++++++++++++++++
2 files changed, 142 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index cc2dd23b077..19559e75077 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -36,6 +36,7 @@ import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.math.RoundingMode;
import java.time.Instant;
import java.time.LocalDate;
@@ -82,7 +83,7 @@ public class TableRowToStorageApiProto {
.toFormatter()
.withZone(ZoneOffset.UTC);
- public static class SchemaConversionException extends Exception {
+ abstract static class SchemaConversionException extends Exception {
SchemaConversionException(String msg) {
super(msg);
}
@@ -108,6 +109,29 @@ public class TableRowToStorageApiProto {
}
}
+ public static class SingleValueConversionException extends
SchemaConversionException {
+ SingleValueConversionException(Object sourceValue, SchemaInformation
schema, Exception e) {
+ super(
+ "Column: "
+ + getPrettyFieldName(schema)
+ + " ("
+ + schema.getType()
+ + "). "
+ + "Value: "
+ + sourceValue
+ + " ("
+ + sourceValue.getClass().getName()
+ + "). Reason: "
+ + e);
+ }
+
+ private static String getPrettyFieldName(SchemaInformation schema) {
+ String fullName = schema.getFullName();
+ String rootPrefix = "root.";
+ return fullName.startsWith(rootPrefix) ?
fullName.substring(rootPrefix.length()) : fullName;
+ }
+ }
+
///////////////////////////////////
// Conversion between TableSchema the json class and TableSchema the proto
class.
@@ -590,9 +614,25 @@ public class TableRowToStorageApiProto {
switch (schemaInformation.getType()) {
case INT64:
if (value instanceof String) {
- return Long.valueOf((String) value);
+ try {
+ return Long.valueOf((String) value);
+ } catch (NumberFormatException e) {
+ throw new SingleValueConversionException(value, schemaInformation,
e);
+ }
} else if (value instanceof Integer || value instanceof Long) {
return ((Number) value).longValue();
+ } else if (value instanceof BigDecimal) {
+ try {
+ return ((BigDecimal) value).longValueExact();
+ } catch (ArithmeticException e) {
+ throw new SingleValueConversionException(value, schemaInformation,
e);
+ }
+ } else if (value instanceof BigInteger) {
+ try {
+ return ((BigInteger) value).longValueExact();
+ } catch (ArithmeticException e) {
+ throw new SingleValueConversionException(value, schemaInformation,
e);
+ }
}
break;
case DOUBLE:
@@ -743,7 +783,7 @@ public class TableRowToStorageApiProto {
}
throw new SchemaDoesntMatchException(
- "Unexpected value :"
+ "Unexpected value: "
+ value
+ ", type: "
+ (value == null ? "null" : value.getClass())
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 1a708ac0537..da803298d03 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -31,14 +32,18 @@ import
com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaConversionException;
+import
org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaInformation;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -873,6 +878,100 @@ public class TableRowToStorageApiProtoTest {
assertTrue(repeatednof2.isEmpty());
}
+ @Test
+ public void testIntegerTypeConversion() throws DescriptorValidationException
{
+ String intFieldName = "int_field";
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(
+ new TableFieldSchema()
+ .setType("INTEGER")
+ .setName(intFieldName)
+ .setMode("REQUIRED"))
+ .build());
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
+ SchemaInformation fieldSchema =
schemaInformation.getSchemaForField(intFieldName);
+ Descriptor schemaDescriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema,
true);
+ FieldDescriptor fieldDescriptor =
schemaDescriptor.findFieldByName(intFieldName);
+
+ Object[][] validIntValues =
+ new Object[][] {
+ // Source and expected converted values.
+ {"123", 123L},
+ {123L, 123L},
+ {123, 123L},
+ {new BigDecimal("123"), 123L},
+ {new BigInteger("123"), 123L}
+ };
+ for (Object[] validValue : validIntValues) {
+ Object sourceValue = validValue[0];
+ Long expectedConvertedValue = (Long) validValue[1];
+ try {
+ Object converted =
+ TableRowToStorageApiProto.singularFieldToProtoValue(
+ fieldSchema, fieldDescriptor, sourceValue, false);
+ assertEquals(expectedConvertedValue, converted);
+ } catch (SchemaConversionException e) {
+ fail(
+ "Failed to convert value "
+ + sourceValue
+ + " of type "
+ + validValue.getClass()
+ + " to INTEGER: "
+ + e);
+ }
+ }
+
+ Object[][] invalidIntValues =
+ new Object[][] {
+ // Value and expected error message
+ {
+ "12.123",
+ "Column: "
+ + intFieldName
+ + " (INT64). Value: 12.123 (java.lang.String). Reason:
java.lang.NumberFormatException: For input string: \"12.123\""
+ },
+ {
+ Long.toString(Long.MAX_VALUE) + '0',
+ "Column: "
+ + intFieldName
+ + " (INT64). Value: 92233720368547758070 (java.lang.String).
Reason: java.lang.NumberFormatException: For input string:
\"92233720368547758070\""
+ },
+ {
+ new BigDecimal("12.123"),
+ "Column: "
+ + intFieldName
+ + " (INT64). Value: 12.123 (java.math.BigDecimal). Reason:
java.lang.ArithmeticException: Rounding necessary"
+ },
+ {
+ new BigInteger(String.valueOf(Long.MAX_VALUE)).add(new
BigInteger("10")),
+ "Column: "
+ + intFieldName
+ + " (INT64). Value: 9223372036854775817
(java.math.BigInteger). Reason: java.lang.ArithmeticException: BigInteger out
of long range"
+ }
+ };
+ for (Object[] invalidValue : invalidIntValues) {
+ Object sourceValue = invalidValue[0];
+ String expectedError = (String) invalidValue[1];
+ try {
+ TableRowToStorageApiProto.singularFieldToProtoValue(
+ fieldSchema, fieldDescriptor, sourceValue, false);
+ fail(
+ "Expected to throw an exception converting "
+ + sourceValue
+ + " of type "
+ + invalidValue.getClass()
+ + " to INTEGER");
+ } catch (SchemaConversionException e) {
+ assertEquals("Exception message", expectedError, e.getMessage());
+ }
+ }
+ }
+
@Test
public void testRejectUnknownField() throws Exception {
TableRow row = new TableRow();