This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6d8e02e2629 [java] BQ: add missing avro conversions to BQ TableRow
(#33221)
6d8e02e2629 is described below
commit 6d8e02e262951f26e8a4e835df462a219155bc11
Author: Michel Davit <[email protected]>
AuthorDate: Tue Dec 10 18:36:44 2024 +0100
[java] BQ: add missing avro conversions to BQ TableRow (#33221)
* [java] BQ: add missing avro conversions to BQ TableRow
Avro float fields can be used to write BQ FLOAT columns.
Add TableRow conversion for such field.
Adding conversion for aveo 1.10+ logical types local-timestamp-millis
and local-timestam-micros.
* Rework tests
* Add map and fixed types conversion
* Fix checkstyle
* Use valid parameters
* Test record nullable field
---
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 112 ++-
.../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 1037 ++++++++++++--------
2 files changed, 746 insertions(+), 403 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index cddde05b194..1af44ba7a01 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -34,6 +34,8 @@ import java.time.format.DateTimeFormatterBuilder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
@@ -50,14 +52,14 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-/**
- * A set of utilities for working with Avro files.
- *
- * <p>These utilities are based on the <a
href="https://avro.apache.org/docs/1.8.1/spec.html">Avro
- * 1.8.1</a> specification.
- */
+/** A set of utilities for working with Avro files. */
class BigQueryAvroUtils {
+ private static final String VERSION_AVRO =
+ Optional.ofNullable(Schema.class.getPackage())
+ .map(Package::getImplementationVersion)
+ .orElse("");
+
// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
@@ -74,6 +76,8 @@ class BigQueryAvroUtils {
* export</a>
* @see <a
href=https://cloud.google.com/bigquery/docs/reference/storage#avro_schema_details>BQ
* avro storage</a>
+ * @see <a
href=https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro>BQ
avro
+ * load</a>
*/
static Schema getPrimitiveType(TableFieldSchema schema, Boolean
useAvroLogicalTypes) {
String bqType = schema.getType();
@@ -116,6 +120,9 @@ class BigQueryAvroUtils {
}
case "DATETIME":
if (useAvroLogicalTypes) {
+ // BQ export uses a custom logical type
+ // TODO for load/storage use
+ // LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())
return
DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
} else {
return SchemaBuilder.builder().stringBuilder().prop("sqlType",
bqType).endString();
@@ -158,6 +165,12 @@ class BigQueryAvroUtils {
@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
+ String dateTime = formatDatetime(timestampMicro);
+ return dateTime + " UTC";
+ }
+
+ @VisibleForTesting
+ static String formatDatetime(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
@@ -168,11 +181,13 @@ class BigQueryAvroUtils {
timestampSec -= 1;
}
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
-
if (micros == 0) {
- return String.format("%s UTC", dayAndTime);
+ return dayAndTime;
+ } else if (micros % 1000 == 0) {
+ return String.format("%s.%03d", dayAndTime, micros / 1000);
+ } else {
+ return String.format("%s.%06d", dayAndTime, micros);
}
- return String.format("%s.%06d UTC", dayAndTime, micros);
}
/**
@@ -274,8 +289,7 @@ class BigQueryAvroUtils {
case UNION:
return convertNullableField(name, schema, v);
case MAP:
- throw new UnsupportedOperationException(
- String.format("Unexpected Avro field schema type %s for field
named %s", type, name));
+ return convertMapField(name, schema, v);
default:
return convertRequiredField(name, schema, v);
}
@@ -296,6 +310,26 @@ class BigQueryAvroUtils {
return values;
}
+ private static List<TableRow> convertMapField(String name, Schema map,
Object v) {
+ // Avro maps are represented as key/value RECORD.
+ if (v == null) {
+ // Handle the case of an empty map.
+ return new ArrayList<>();
+ }
+
+ Schema type = map.getValueType();
+ Map<String, Object> elements = (Map<String, Object>) v;
+ ArrayList<TableRow> values = new ArrayList<>();
+ for (Map.Entry<String, Object> element : elements.entrySet()) {
+ TableRow row =
+ new TableRow()
+ .set("key", element.getKey())
+ .set("value", convertRequiredField(name, type,
element.getValue()));
+ values.add(row);
+ }
+ return values;
+ }
+
private static Object convertRequiredField(String name, Schema schema,
Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For
example, a BigQuery
// INTEGER type maps to an Avro LONG type.
@@ -305,45 +339,83 @@ class BigQueryAvroUtils {
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case BOOLEAN:
- // SQL types BOOL, BOOLEAN
+ // SQL type BOOL (BOOLEAN)
return v;
case INT:
if (logicalType instanceof LogicalTypes.Date) {
- // SQL types DATE
+ // SQL type DATE
+ // ideally LocalDate but TableRowJsonCoder encodes as String
return formatDate((Integer) v);
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ // Write only: SQL type TIME
+ // ideally LocalTime but TableRowJsonCoder encodes as String
+ return formatTime(((Integer) v) * 1000L);
} else {
- throw new UnsupportedOperationException(
- String.format("Unexpected Avro field schema type %s for field
named %s", type, name));
+ // Write only: SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT,
TINYINT, BYTEINT)
+ // ideally Integer but keep consistency with BQ JSON export that
uses String
+ return ((Integer) v).toString();
}
case LONG:
if (logicalType instanceof LogicalTypes.TimeMicros) {
- // SQL types TIME
+ // SQL type TIME
+ // ideally LocalTime but TableRowJsonCoder encodes as String
return formatTime((Long) v);
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ // Write only: SQL type TIMESTAMP
+ // ideally Instant but TableRowJsonCoder encodes as String
+ return formatTimestamp((Long) v * 1000L);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
- // SQL types TIMESTAMP
+ // SQL type TIMESTAMP
+ // ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v);
+ } else if (!(VERSION_AVRO.startsWith("1.8") ||
VERSION_AVRO.startsWith("1.9"))
+ && logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+ // Write only: SQL type DATETIME
+ // ideally LocalDateTime but TableRowJsonCoder encodes as String
+ return formatDatetime(((Long) v) * 1000);
+ } else if (!(VERSION_AVRO.startsWith("1.8") ||
VERSION_AVRO.startsWith("1.9"))
+ && logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+ // Write only: SQL type DATETIME
+ // ideally LocalDateTime but TableRowJsonCoder encodes as String
+ return formatDatetime((Long) v);
} else {
- // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+ // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+ // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ
JSON export that uses
+ // String
return ((Long) v).toString();
}
+ case FLOAT:
+ // Write only: SQL type FLOAT64
+ // ideally Float but TableRowJsonCoder decodes as Double
+ return Double.valueOf(v.toString());
case DOUBLE:
- // SQL types FLOAT64
+ // SQL type FLOAT64
return v;
case BYTES:
if (logicalType instanceof LogicalTypes.Decimal) {
// SQL tpe NUMERIC, BIGNUMERIC
+ // ideally BigDecimal but TableRowJsonCoder encodes as String
return new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) v, schema, logicalType)
.toString();
} else {
- // SQL types BYTES
+ // SQL type BYTES
+ // ideally byte[] but TableRowJsonCoder encodes as String
return BaseEncoding.base64().encode(((ByteBuffer) v).array());
}
case STRING:
// SQL types STRING, DATETIME, GEOGRAPHY, JSON
// when not using logical type DATE, TIME too
return v.toString();
+ case ENUM:
+ // SQL types STRING
+ return v.toString();
+ case FIXED:
+ // SQL type BYTES
+ // ideally byte[] but TableRowJsonCoder encodes as String
+ return BaseEncoding.base64().encode(((ByteBuffer) v).array());
case RECORD:
+ // SQL types RECORD
return convertGenericRecordToTableRow((GenericRecord) v);
default:
throw new UnsupportedOperationException(
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 662f2658eb6..2333278a11f 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -28,23 +28,23 @@ import com.google.api.services.bigquery.model.TableSchema;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.AvroSchema;
-import org.apache.avro.reflect.Nullable;
-import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.junit.Test;
@@ -54,363 +54,678 @@ import org.junit.runners.JUnit4;
/** Tests for {@link BigQueryAvroUtils}. */
@RunWith(JUnit4.class)
public class BigQueryAvroUtilsTest {
- private List<TableFieldSchema> subFields =
- Lists.newArrayList(
- new
TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
- /*
- * Note that the quality and quantity fields do not have their mode set, so
they should default
- * to NULLABLE. This is an important test of BigQuery semantics.
- *
- * All the other fields we set in this function are required on the Schema
response.
- *
- * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
- */
- private List<TableFieldSchema> fields =
- Lists.newArrayList(
- new
TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
- new
TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
- new TableFieldSchema().setName("quality").setType("FLOAT") /*
default to NULLABLE */,
- new TableFieldSchema().setName("quantity").setType("INTEGER") /*
default to NULLABLE */,
- new
TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
- new
TableFieldSchema().setName("birthdayMoney").setType("NUMERIC").setMode("NULLABLE"),
- new TableFieldSchema()
- .setName("lotteryWinnings")
- .setType("BIGNUMERIC")
- .setMode("NULLABLE"),
- new
TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
- new
TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"),
- new
TableFieldSchema().setName("anniversaryDate").setType("DATE").setMode("NULLABLE"),
- new TableFieldSchema()
- .setName("anniversaryDatetime")
- .setType("DATETIME")
- .setMode("NULLABLE"),
- new
TableFieldSchema().setName("anniversaryTime").setType("TIME").setMode("NULLABLE"),
- new TableFieldSchema()
- .setName("scion")
- .setType("RECORD")
- .setMode("NULLABLE")
- .setFields(subFields),
- new TableFieldSchema()
- .setName("associates")
- .setType("RECORD")
- .setMode("REPEATED")
- .setFields(subFields),
- new
TableFieldSchema().setName("geoPositions").setType("GEOGRAPHY").setMode("NULLABLE"));
-
- private ByteBuffer convertToBytes(BigDecimal bigDecimal, int precision, int
scale) {
- LogicalType bigDecimalLogicalType = LogicalTypes.decimal(precision, scale);
- return new Conversions.DecimalConversion().toBytes(bigDecimal, null,
bigDecimalLogicalType);
+
+ private TableSchema tableSchema(Function<TableFieldSchema, TableFieldSchema>
fn) {
+ TableFieldSchema column = new TableFieldSchema().setName("value");
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setFields(Lists.newArrayList(fn.apply(column)));
+ return tableSchema;
+ }
+
+ private Schema avroSchema(
+ Function<SchemaBuilder.FieldBuilder<Schema>,
SchemaBuilder.FieldAssembler<Schema>> fn) {
+ return fn.apply(
+ SchemaBuilder.record("root")
+ .namespace("org.apache.beam.sdk.io.gcp.bigquery")
+ .doc("Translated Avro Schema for root")
+ .fields()
+ .name("value"))
+ .endRecord();
}
+ @SuppressWarnings("JavaInstantGetSecondsGetNano")
@Test
- public void testConvertGenericRecordToTableRow() throws Exception {
- BigDecimal numeric = new BigDecimal("123456789.123456789");
- ByteBuffer numericBytes = convertToBytes(numeric, 38, 9);
- BigDecimal bigNumeric =
- new BigDecimal(
-
"578960446186580977117854925043439539266.34992332820282019728792003956564819967");
- ByteBuffer bigNumericBytes = convertToBytes(bigNumeric, 77, 38);
- Schema avroSchema = ReflectData.get().getSchema(Bird.class);
-
- {
- // Test nullable fields.
- GenericRecord record = new GenericData.Record(avroSchema);
- record.put("number", 5L);
- TableRow convertedRow =
BigQueryAvroUtils.convertGenericRecordToTableRow(record);
- TableRow row = new TableRow().set("number", "5").set("associates", new
ArrayList<TableRow>());
- assertEquals(row, convertedRow);
- TableRow clonedRow = convertedRow.clone();
- assertEquals(convertedRow, clonedRow);
- }
- {
- // Test type conversion for:
- // INTEGER, FLOAT, NUMERIC, TIMESTAMP, BOOLEAN, BYTES, DATE, DATETIME,
TIME.
- GenericRecord record = new GenericData.Record(avroSchema);
- byte[] soundBytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8);
- ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes);
- soundByteBuffer.rewind();
- record.put("number", 5L);
- record.put("quality", 5.0);
- record.put("birthday", 5L);
- record.put("birthdayMoney", numericBytes);
- record.put("lotteryWinnings", bigNumericBytes);
- record.put("flighted", Boolean.TRUE);
- record.put("sound", soundByteBuffer);
- record.put("anniversaryDate", new Utf8("2000-01-01"));
- record.put("anniversaryDatetime", new String("2000-01-01
00:00:00.000005"));
- record.put("anniversaryTime", new Utf8("00:00:00.000005"));
- record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)"));
- TableRow convertedRow =
BigQueryAvroUtils.convertGenericRecordToTableRow(record);
- TableRow row =
- new TableRow()
- .set("number", "5")
- .set("birthday", "1970-01-01 00:00:00.000005 UTC")
- .set("birthdayMoney", numeric.toString())
- .set("lotteryWinnings", bigNumeric.toString())
- .set("quality", 5.0)
- .set("associates", new ArrayList<TableRow>())
- .set("flighted", Boolean.TRUE)
- .set("sound", BaseEncoding.base64().encode(soundBytes))
- .set("anniversaryDate", "2000-01-01")
- .set("anniversaryDatetime", "2000-01-01 00:00:00.000005")
- .set("anniversaryTime", "00:00:00.000005")
- .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)");
- TableRow clonedRow = convertedRow.clone();
- assertEquals(convertedRow, clonedRow);
- assertEquals(row, convertedRow);
- }
- {
- // Test repeated fields.
- Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema();
- GenericRecord nestedRecord = new GenericData.Record(subBirdSchema);
- nestedRecord.put("species", "other");
- GenericRecord record = new GenericData.Record(avroSchema);
- record.put("number", 5L);
- record.put("associates", Lists.newArrayList(nestedRecord));
- record.put("birthdayMoney", numericBytes);
- record.put("lotteryWinnings", bigNumericBytes);
- TableRow convertedRow =
BigQueryAvroUtils.convertGenericRecordToTableRow(record);
- TableRow row =
+ public void testConvertGenericRecordToTableRow() {
+ {
+ // bool
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().booleanType().noDefault()))
+ .set("value", false)
+ .build();
+ TableRow expected = new TableRow().set("value", false);
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // int
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().intType().noDefault()))
+ .set("value", 5)
+ .build();
+ TableRow expected = new TableRow().set("value", "5");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // long
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().longType().noDefault()))
+ .set("value", 5L)
+ .build();
+ TableRow expected = new TableRow().set("value", "5");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // float
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().floatType().noDefault()))
+ .set("value", 5.5f)
+ .build();
+ TableRow expected = new TableRow().set("value", 5.5);
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // double
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().doubleType().noDefault()))
+ .set("value", 5.55)
+ .build();
+ TableRow expected = new TableRow().set("value", 5.55);
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // bytes
+ byte[] bytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().bytesType().noDefault()))
+ .set("value", bb)
+ .build();
+ TableRow expected = new TableRow().set("value",
BaseEncoding.base64().encode(bytes));
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // string
+ Schema schema = avroSchema(f -> f.type().stringType().noDefault());
+ GenericRecord record = new GenericRecordBuilder(schema).set("value",
"test").build();
+ GenericRecord utf8Record =
+ new GenericRecordBuilder(schema).set("value", new
Utf8("test")).build();
+ TableRow expected = new TableRow().set("value", "test");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+ TableRow utf8Row =
BigQueryAvroUtils.convertGenericRecordToTableRow(utf8Record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ assertEquals(expected, utf8Row);
+ assertEquals(expected, utf8Row.clone());
+ }
+
+ {
+ // decimal
+ LogicalType lt = LogicalTypes.decimal(38, 9);
+ Schema decimalType = lt.addToSchema(SchemaBuilder.builder().bytesType());
+ BigDecimal bd = new BigDecimal("123456789.123456789");
+ ByteBuffer bytes = new Conversions.DecimalConversion().toBytes(bd, null,
lt);
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(decimalType).noDefault()))
+ .set("value", bytes)
+ .build();
+ TableRow expected = new TableRow().set("value", bd.toString());
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // date
+ LogicalType lt = LogicalTypes.date();
+ Schema dateType = lt.addToSchema(SchemaBuilder.builder().intType());
+ LocalDate date = LocalDate.of(2000, 1, 1);
+ int days = (int) date.toEpochDay();
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(dateType).noDefault()))
+ .set("value", days)
+ .build();
+ TableRow expected = new TableRow().set("value", "2000-01-01");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // time-millis
+ LogicalType lt = LogicalTypes.timeMillis();
+ Schema timeType = lt.addToSchema(SchemaBuilder.builder().intType());
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ int millis = (int) (time.toNanoOfDay() / 1000000);
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timeType).noDefault()))
+ .set("value", millis)
+ .build();
+ TableRow expected = new TableRow().set("value", "01:02:03.123");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // time-micros
+ LogicalType lt = LogicalTypes.timeMicros();
+ Schema timeType = lt.addToSchema(SchemaBuilder.builder().longType());
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ long micros = time.toNanoOfDay() / 1000;
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timeType).noDefault()))
+ .set("value", micros)
+ .build();
+ TableRow expected = new TableRow().set("value", "01:02:03.123456");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // local-timestamp-millis
+ LogicalType lt = LogicalTypes.localTimestampMillis();
+ Schema timestampType =
lt.addToSchema(SchemaBuilder.builder().longType());
+ LocalDate date = LocalDate.of(2000, 1, 1);
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ LocalDateTime ts = LocalDateTime.of(date, time);
+ long millis = ts.toInstant(ZoneOffset.UTC).toEpochMilli();
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timestampType).noDefault()))
+ .set("value", millis)
+ .build();
+ TableRow expected = new TableRow().set("value", "2000-01-01
01:02:03.123");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // local-timestamp-micros
+ LogicalType lt = LogicalTypes.localTimestampMicros();
+ Schema timestampType =
lt.addToSchema(SchemaBuilder.builder().longType());
+ LocalDate date = LocalDate.of(2000, 1, 1);
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ LocalDateTime ts = LocalDateTime.of(date, time);
+ long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond();
+ int nanos = ts.toInstant(ZoneOffset.UTC).getNano();
+ long micros = seconds * 1000000 + (nanos / 1000);
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timestampType).noDefault()))
+ .set("value", micros)
+ .build();
+ TableRow expected = new TableRow().set("value", "2000-01-01
01:02:03.123456");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // timestamp-micros
+ LogicalType lt = LogicalTypes.timestampMillis();
+ Schema timestampType =
lt.addToSchema(SchemaBuilder.builder().longType());
+ LocalDate date = LocalDate.of(2000, 1, 1);
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ LocalDateTime ts = LocalDateTime.of(date, time);
+ long millis = ts.toInstant(ZoneOffset.UTC).toEpochMilli();
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timestampType).noDefault()))
+ .set("value", millis)
+ .build();
+ TableRow expected = new TableRow().set("value", "2000-01-01 01:02:03.123
UTC");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // timestamp-millis
+ LogicalType lt = LogicalTypes.timestampMicros();
+ Schema timestampType =
lt.addToSchema(SchemaBuilder.builder().longType());
+ LocalDate date = LocalDate.of(2000, 1, 1);
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ LocalDateTime ts = LocalDateTime.of(date, time);
+ long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond();
+ int nanos = ts.toInstant(ZoneOffset.UTC).getNano();
+ long micros = seconds * 1000000 + (nanos / 1000);
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timestampType).noDefault()))
+ .set("value", micros)
+ .build();
+ TableRow expected = new TableRow().set("value", "2000-01-01
01:02:03.123456 UTC");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // enum
+ Schema enumSchema = SchemaBuilder.enumeration("color").symbols("red",
"green", "blue");
+ GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumSchema,
"RED");
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(enumSchema).noDefault()))
+ .set("value", symbol)
+ .build();
+ TableRow expected = new TableRow().set("value", "RED");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // fixed
+ UUID uuid = UUID.randomUUID();
+ ByteBuffer bb = ByteBuffer.allocate(16);
+ bb.putLong(uuid.getMostSignificantBits());
+ bb.putLong(uuid.getLeastSignificantBits());
+ bb.rewind();
+ byte[] bytes = bb.array();
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().fixed("uuid").size(16).noDefault()))
+ .set("value", bb)
+ .build();
+ TableRow expected = new TableRow().set("value",
BaseEncoding.base64().encode(bytes));
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // null
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().optional().booleanType())).build();
+ TableRow expected = new TableRow();
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // array
+ GenericRecord record =
+ new GenericRecordBuilder(
+ avroSchema(f ->
f.type().array().items().booleanType().noDefault()))
+ .set("value", Lists.newArrayList(true, false))
+ .build();
+ TableRow expected = new TableRow().set("value", Lists.newArrayList(true,
false));
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // map
+ Map<String, Integer> map = new HashMap<>();
+ map.put("left", 1);
+ map.put("right", -1);
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type().map().values().intType().noDefault()))
+ .set("value", map)
+ .build();
+ TableRow expected =
new TableRow()
- .set("associates", Lists.newArrayList(new
TableRow().set("species", "other")))
- .set("number", "5")
- .set("birthdayMoney", numeric.toString())
- .set("lotteryWinnings", bigNumeric.toString());
- assertEquals(row, convertedRow);
- TableRow clonedRow = convertedRow.clone();
- assertEquals(convertedRow, clonedRow);
+ .set(
+ "value",
+ Lists.newArrayList(
+ new TableRow().set("key", "left").set("value", "1"),
+ new TableRow().set("key", "right").set("value", "-1")));
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
+ {
+ // record
+ Schema subSchema =
+ SchemaBuilder.builder()
+ .record("record")
+ .fields()
+ .name("int")
+ .type()
+ .intType()
+ .noDefault()
+ .name("float")
+ .type()
+ .floatType()
+ .noDefault()
+ .endRecord();
+ GenericRecord subRecord =
+ new GenericRecordBuilder(subSchema).set("int", 5).set("float",
5.5f).build();
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(subSchema).noDefault()))
+ .set("value", subRecord)
+ .build();
+ TableRow expected =
+ new TableRow().set("value", new TableRow().set("int",
"5").set("float", 5.5));
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
}
}
@Test
public void testConvertBigQuerySchemaToAvroSchema() {
- TableSchema tableSchema = new TableSchema();
- tableSchema.setFields(fields);
- Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema);
+ {
+ // REQUIRED
+ TableSchema tableSchema = tableSchema(f ->
f.setType("BOOLEAN").setMode("REQUIRED"));
+ Schema expected = avroSchema(f -> f.type().booleanType().noDefault());
- assertThat(avroSchema.getField("number").schema(),
equalTo(Schema.create(Type.LONG)));
- assertThat(
- avroSchema.getField("species").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.STRING))));
- assertThat(
- avroSchema.getField("quality").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.DOUBLE))));
- assertThat(
- avroSchema.getField("quantity").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.LONG))));
- assertThat(
- avroSchema.getField("birthday").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Type.NULL),
-
LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG)))));
- assertThat(
- avroSchema.getField("birthdayMoney").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Type.NULL),
- LogicalTypes.decimal(38,
9).addToSchema(Schema.create(Type.BYTES)))));
- assertThat(
- avroSchema.getField("lotteryWinnings").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Type.NULL),
- LogicalTypes.decimal(77,
38).addToSchema(Schema.create(Type.BYTES)))));
- assertThat(
- avroSchema.getField("flighted").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.BOOLEAN))));
- assertThat(
- avroSchema.getField("sound").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL),
Schema.create(Type.BYTES))));
- Schema dateSchema = Schema.create(Type.INT);
- LogicalTypes.date().addToSchema(dateSchema);
- assertThat(
- avroSchema.getField("anniversaryDate").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema)));
- Schema dateTimeSchema = Schema.create(Type.STRING);
- BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema);
- assertThat(
- avroSchema.getField("anniversaryDatetime").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema)));
- Schema timeSchema = Schema.create(Type.LONG);
- LogicalTypes.timeMicros().addToSchema(timeSchema);
- assertThat(
- avroSchema.getField("anniversaryTime").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema)));
- Schema geoSchema = Schema.create(Type.STRING);
- geoSchema.addProp("sqlType", "GEOGRAPHY");
- assertThat(
- avroSchema.getField("geoPositions").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema)));
- assertThat(
- avroSchema.getField("scion").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Type.NULL),
- Schema.createRecord(
- "scion",
- "Translated Avro Schema for scion",
- "org.apache.beam.sdk.io.gcp.bigquery",
- false,
- ImmutableList.of(
- new Field(
- "species",
- Schema.createUnion(
- Schema.create(Type.NULL),
Schema.create(Type.STRING)),
- null,
- (Object) null))))));
- assertThat(
- avroSchema.getField("associates").schema(),
- equalTo(
- Schema.createArray(
- Schema.createRecord(
- "associates",
- "Translated Avro Schema for associates",
- "org.apache.beam.sdk.io.gcp.bigquery",
- false,
- ImmutableList.of(
- new Field(
- "species",
- Schema.createUnion(
- Schema.create(Type.NULL),
Schema.create(Type.STRING)),
- null,
- (Object) null))))));
- }
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ }
- @Test
- public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() {
- TableSchema tableSchema = new TableSchema();
- tableSchema.setFields(fields);
- Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema,
false);
+ {
+ // NULLABLE
+ TableSchema tableSchema = tableSchema(f ->
f.setType("BOOLEAN").setMode("NULLABLE"));
+ Schema expected =
+ avroSchema(f ->
f.type().unionOf().nullType().and().booleanType().endUnion().noDefault());
- assertThat(avroSchema.getField("number").schema(),
equalTo(Schema.create(Schema.Type.LONG)));
- assertThat(
- avroSchema.getField("species").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING))));
- assertThat(
- avroSchema.getField("quality").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.DOUBLE))));
- assertThat(
- avroSchema.getField("quantity").schema(),
- equalTo(
- Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.LONG))));
- assertThat(
- avroSchema.getField("birthday").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
-
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))));
- assertThat(
- avroSchema.getField("birthdayMoney").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
- LogicalTypes.decimal(38,
9).addToSchema(Schema.create(Schema.Type.BYTES)))));
- assertThat(
- avroSchema.getField("lotteryWinnings").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
- LogicalTypes.decimal(77,
38).addToSchema(Schema.create(Schema.Type.BYTES)))));
- assertThat(
- avroSchema.getField("flighted").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.BOOLEAN))));
- assertThat(
- avroSchema.getField("sound").schema(),
- equalTo(
- Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.BYTES))));
- Schema dateSchema = Schema.create(Schema.Type.STRING);
- dateSchema.addProp("sqlType", "DATE");
- assertThat(
- avroSchema.getField("anniversaryDate").schema(),
- equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL),
dateSchema)));
- Schema dateTimeSchema = Schema.create(Schema.Type.STRING);
- dateTimeSchema.addProp("sqlType", "DATETIME");
- assertThat(
- avroSchema.getField("anniversaryDatetime").schema(),
- equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL),
dateTimeSchema)));
- Schema timeSchema = Schema.create(Schema.Type.STRING);
- timeSchema.addProp("sqlType", "TIME");
- assertThat(
- avroSchema.getField("anniversaryTime").schema(),
- equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL),
timeSchema)));
- Schema geoSchema = Schema.create(Type.STRING);
- geoSchema.addProp("sqlType", "GEOGRAPHY");
- assertThat(
- avroSchema.getField("geoPositions").schema(),
- equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL),
geoSchema)));
- assertThat(
- avroSchema.getField("scion").schema(),
- equalTo(
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
- Schema.createRecord(
- "scion",
- "Translated Avro Schema for scion",
- "org.apache.beam.sdk.io.gcp.bigquery",
- false,
- ImmutableList.of(
- new Schema.Field(
- "species",
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)),
- null,
- (Object) null))))));
- assertThat(
- avroSchema.getField("associates").schema(),
- equalTo(
- Schema.createArray(
- Schema.createRecord(
- "associates",
- "Translated Avro Schema for associates",
- "org.apache.beam.sdk.io.gcp.bigquery",
- false,
- ImmutableList.of(
- new Schema.Field(
- "species",
- Schema.createUnion(
- Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)),
- null,
- (Object) null))))));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ }
+
+ {
+ // default mode -> NULLABLE
+ TableSchema tableSchema = tableSchema(f -> f.setType("BOOLEAN"));
+ Schema expected =
+ avroSchema(f ->
f.type().unionOf().nullType().and().booleanType().endUnion().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ }
+
+ {
+ // REPEATED
+ TableSchema tableSchema = tableSchema(f ->
f.setType("BOOLEAN").setMode("REPEATED"));
+ Schema expected = avroSchema(f ->
f.type().array().items().booleanType().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ }
+
+ {
+ // INTEGER
+ TableSchema tableSchema = tableSchema(f ->
f.setType("INTEGER").setMode("REQUIRED"));
+ Schema expected = avroSchema(f -> f.type().longType().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ }
+
+ {
+ // FLOAT
+ TableSchema tableSchema = tableSchema(f ->
f.setType("FLOAT").setMode("REQUIRED"));
+ Schema expected = avroSchema(f -> f.type().doubleType().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // BYTES
+ TableSchema tableSchema = tableSchema(f ->
f.setType("BYTES").setMode("REQUIRED"));
+ Schema expected = avroSchema(f -> f.type().bytesType().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // STRING
+ TableSchema tableSchema = tableSchema(f ->
f.setType("STRING").setMode("REQUIRED"));
+ Schema expected = avroSchema(f -> f.type().stringType().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // NUMERIC
+ TableSchema tableSchema = tableSchema(f ->
f.setType("NUMERIC").setMode("REQUIRED"));
+ Schema decimalType =
+ LogicalTypes.decimal(38,
9).addToSchema(SchemaBuilder.builder().bytesType());
+ Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // NUMERIC with precision
+ TableSchema tableSchema =
+ tableSchema(f ->
f.setType("NUMERIC").setPrecision(29L).setMode("REQUIRED"));
+ Schema decimalType =
+ LogicalTypes.decimal(29,
0).addToSchema(SchemaBuilder.builder().bytesType());
+ Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // NUMERIC with precision and scale
+ TableSchema tableSchema =
+ tableSchema(f ->
f.setType("NUMERIC").setPrecision(10L).setScale(9L).setMode("REQUIRED"));
+ Schema decimalType =
+ LogicalTypes.decimal(10,
9).addToSchema(SchemaBuilder.builder().bytesType());
+ Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // BIGNUMERIC
+ TableSchema tableSchema = tableSchema(f ->
f.setType("BIGNUMERIC").setMode("REQUIRED"));
+ Schema decimalType =
+ LogicalTypes.decimal(77,
38).addToSchema(SchemaBuilder.builder().bytesType());
+ Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // BIGNUMERIC with precision
+ TableSchema tableSchema =
+ tableSchema(f ->
f.setType("BIGNUMERIC").setPrecision(38L).setMode("REQUIRED"));
+ Schema decimalType =
+ LogicalTypes.decimal(38,
0).addToSchema(SchemaBuilder.builder().bytesType());
+ Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // BIGNUMERIC with precision and scale
+ TableSchema tableSchema =
+ tableSchema(
+ f ->
f.setType("BIGNUMERIC").setPrecision(39L).setScale(38L).setMode("REQUIRED"));
+ Schema decimalType =
+ LogicalTypes.decimal(39,
38).addToSchema(SchemaBuilder.builder().bytesType());
+ Schema expected = avroSchema(f -> f.type(decimalType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // DATE
+ TableSchema tableSchema = tableSchema(f ->
f.setType("DATE").setMode("REQUIRED"));
+ Schema dateType =
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ Schema expected = avroSchema(f -> f.type(dateType).noDefault());
+ Schema expectedExport =
+ avroSchema(f -> f.type().stringBuilder().prop("sqlType",
"DATE").endString().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expectedExport,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // TIME
+ TableSchema tableSchema = tableSchema(f ->
f.setType("TIME").setMode("REQUIRED"));
+ Schema timeType =
LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+ Schema expected = avroSchema(f -> f.type(timeType).noDefault());
+ Schema expectedExport =
+ avroSchema(f -> f.type().stringBuilder().prop("sqlType",
"TIME").endString().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expectedExport,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // DATETIME
+ TableSchema tableSchema = tableSchema(f ->
f.setType("DATETIME").setMode("REQUIRED"));
+ Schema timeType =
+
BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
+ Schema expected = avroSchema(f -> f.type(timeType).noDefault());
+ Schema expectedExport =
+ avroSchema(
+ f -> f.type().stringBuilder().prop("sqlType",
"DATETIME").endString().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expectedExport,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // TIMESTAMP
+ TableSchema tableSchema = tableSchema(f ->
f.setType("TIMESTAMP").setMode("REQUIRED"));
+ Schema timestampType =
+
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ Schema expected = avroSchema(f -> f.type(timestampType).noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // GEOGRAPHY
+ TableSchema tableSchema = tableSchema(f ->
f.setType("GEOGRAPHY").setMode("REQUIRED"));
+ Schema expected =
+ avroSchema(
+ f -> f.type().stringBuilder().prop("sqlType",
"GEOGRAPHY").endString().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // JSON
+ TableSchema tableSchema = tableSchema(f ->
f.setType("JSON").setMode("REQUIRED"));
+ Schema expected =
+ avroSchema(f -> f.type().stringBuilder().prop("sqlType",
"JSON").endString().noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false));
+ }
+
+ {
+ // STRUCT/RECORD
+ TableFieldSchema subInteger =
+ new
TableFieldSchema().setName("int").setType("INTEGER").setMode("NULLABLE");
+ TableFieldSchema subFloat =
+ new
TableFieldSchema().setName("float").setType("FLOAT").setMode("REQUIRED");
+ TableSchema structTableSchema =
+ tableSchema(
+ f ->
+ f.setType("STRUCT")
+ .setMode("REQUIRED")
+ .setFields(Lists.newArrayList(subInteger, subFloat)));
+ TableSchema recordTableSchema =
+ tableSchema(
+ f ->
+ f.setType("RECORD")
+ .setMode("REQUIRED")
+ .setFields(Lists.newArrayList(subInteger, subFloat)));
+
+ Schema expected =
+ avroSchema(
+ f ->
+ f.type()
+ .record("value")
+ .fields()
+ .name("int")
+ .type()
+ .unionOf()
+ .nullType()
+ .and()
+ .longType()
+ .endUnion()
+ .noDefault()
+ .name("float")
+ .type()
+ .doubleType()
+ .noDefault()
+ .endRecord()
+ .noDefault());
+
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(structTableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(structTableSchema, false));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(recordTableSchema));
+ assertEquals(expected,
BigQueryAvroUtils.toGenericAvroSchema(recordTableSchema, false));
+ }
}
@Test
public void testFormatTimestamp() {
- assertThat(
- BigQueryAvroUtils.formatTimestamp(1452062291123456L),
- equalTo("2016-01-06 06:38:11.123456 UTC"));
+ long micros = 1452062291123456L;
+ String expected = "2016-01-06 06:38:11.123456";
+ assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
+ assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + "
UTC"));
}
@Test
- public void testFormatTimestampLeadingZeroesOnMicros() {
- assertThat(
- BigQueryAvroUtils.formatTimestamp(1452062291000456L),
- equalTo("2016-01-06 06:38:11.000456 UTC"));
+ public void testFormatTimestampMillis() {
+ long millis = 1452062291123L;
+ long micros = millis * 1000L;
+ String expected = "2016-01-06 06:38:11.123";
+ assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
+ assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + "
UTC"));
}
@Test
- public void testFormatTimestampTrailingZeroesOnMicros() {
- assertThat(
- BigQueryAvroUtils.formatTimestamp(1452062291123000L),
- equalTo("2016-01-06 06:38:11.123000 UTC"));
+ public void testFormatTimestampSeconds() {
+ long seconds = 1452062291L;
+ long micros = seconds * 1000L * 1000L;
+ String expected = "2016-01-06 06:38:11";
+ assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
+ assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + "
UTC"));
}
@Test
public void testFormatTimestampNegative() {
- assertThat(BigQueryAvroUtils.formatTimestamp(-1L), equalTo("1969-12-31
23:59:59.999999 UTC"));
- assertThat(
- BigQueryAvroUtils.formatTimestamp(-100_000L), equalTo("1969-12-31
23:59:59.900000 UTC"));
- assertThat(BigQueryAvroUtils.formatTimestamp(-1_000_000L),
equalTo("1969-12-31 23:59:59 UTC"));
+ assertThat(BigQueryAvroUtils.formatDatetime(-1L), equalTo("1969-12-31
23:59:59.999999"));
+ assertThat(BigQueryAvroUtils.formatDatetime(-100_000L),
equalTo("1969-12-31 23:59:59.900"));
+ assertThat(BigQueryAvroUtils.formatDatetime(-1_000_000L),
equalTo("1969-12-31 23:59:59"));
// No leap seconds before 1972. 477 leap years from 1 through 1969.
assertThat(
- BigQueryAvroUtils.formatTimestamp(-(1969L * 365 + 477) * 86400 *
1_000_000),
- equalTo("0001-01-01 00:00:00 UTC"));
+ BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 *
1_000_000),
+ equalTo("0001-01-01 00:00:00"));
}
@Test
@@ -501,48 +816,4 @@ public class BigQueryAvroUtilsTest {
String output = BigQueryAvroUtils.toGenericAvroSchema(schema,
false).toString();
assertThat(output.length(), greaterThan(0));
}
-
- /** Pojo class used as the record type in tests. */
- @SuppressWarnings("unused") // Used by Avro reflection.
- static class Bird {
- long number;
- @Nullable String species;
- @Nullable Double quality;
- @Nullable Long quantity;
-
- @AvroSchema(value = "[\"null\", {\"type\": \"long\", \"logicalType\":
\"timestamp-micros\"}]")
- Instant birthday;
-
- @AvroSchema(
- value =
- "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\",
\"precision\": 38, \"scale\": 9}]")
- BigDecimal birthdayMoney;
-
- @AvroSchema(
- value =
- "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\",
\"precision\": 77, \"scale\": 38}]")
- BigDecimal lotteryWinnings;
-
- @AvroSchema(value = "[\"null\", {\"type\": \"string\", \"sqlType\":
\"GEOGRAPHY\"}]")
- String geoPositions;
-
- @Nullable Boolean flighted;
- @Nullable ByteBuffer sound;
- @Nullable Utf8 anniversaryDate;
- @Nullable String anniversaryDatetime;
- @Nullable Utf8 anniversaryTime;
- @Nullable SubBird scion;
- SubBird[] associates;
-
- static class SubBird {
- @Nullable String species;
-
- public SubBird() {}
- }
-
- public Bird() {
- associates = new SubBird[1];
- associates[0] = new SubBird();
- }
- }
}