[
https://issues.apache.org/jira/browse/BEAM-5807?focusedWorklogId=157445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157445
]
ASF GitHub Bot logged work on BEAM-5807:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Oct/18 09:26
Start Date: 23/Oct/18 09:26
Worklog Time Spent: 10m
Work Description: kanterov commented on a change in pull request #6777:
[BEAM-5807] Conversion from AVRO records to rows
URL: https://github.com/apache/beam/pull/6777#discussion_r227304230
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -0,0 +1,349 @@
+package org.apache.beam.sdk.schemas.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/** Utils to convert AVRO records to Beam rows. */
+@Experimental(Experimental.Kind.SCHEMAS)
+public class AvroUtils {
+ private AvroUtils() {}
+
+ /**
+ * Converts AVRO schema to Beam row schema.
+ *
+ * @param schema schema of type RECORD
+ */
+ public static Schema toSchema(@Nonnull org.apache.avro.Schema schema) {
+ Schema.Builder builder = Schema.builder();
+
+ for (org.apache.avro.Schema.Field field : schema.getFields()) {
+ org.apache.avro.Schema unwrapped = unwrapNullableSchema(field.schema());
+
+ if (!unwrapped.equals(field.schema())) {
+ builder.addNullableField(field.name(), toFieldType(unwrapped));
+ } else {
+ builder.addField(field.name(), toFieldType(unwrapped));
+ }
+ }
+
+ return builder.build();
+ }
+
+ /** Converts AVRO schema to Beam field. */
+ public static Schema.FieldType toFieldType(@Nonnull org.apache.avro.Schema
avroSchema) {
+ switch (avroSchema.getType()) {
+ case RECORD:
+ return Schema.FieldType.row(toSchema(avroSchema));
+
+ case ENUM:
+ return Schema.FieldType.STRING;
+
+ case ARRAY:
+ Schema.FieldType elementType =
toFieldType(avroSchema.getElementType());
+ return Schema.FieldType.array(elementType);
+
+ case MAP:
+ return Schema.FieldType.map(
+ Schema.FieldType.STRING, toFieldType(avroSchema.getValueType()));
+
+ case FIXED:
+ return Schema.FieldType.BYTES;
+
+ case STRING:
+ return Schema.FieldType.STRING;
+
+ case BYTES:
+ return Schema.FieldType.BYTES;
+
+ case INT:
+ return Schema.FieldType.INT32;
+
+ case LONG:
+ return Schema.FieldType.INT64;
+
+ case FLOAT:
+ return Schema.FieldType.FLOAT;
+
+ case DOUBLE:
+ return Schema.FieldType.DOUBLE;
+
+ case BOOLEAN:
+ return Schema.FieldType.BOOLEAN;
+
+ case UNION:
+ throw new RuntimeException("Can't convert 'union' to FieldType");
+
+ case NULL:
+ throw new RuntimeException("Can't convert 'null' to FieldType");
+
+ default:
+ throw new AssertionError("Unexpected AVRO Schema.Type: " +
avroSchema.getType());
+ }
+ }
+
+ /**
+ * Strict conversion from AVRO to Beam, strict because it doesn't do
widening or narrowing during
+ * conversion.
+ */
+ public static Row toRowStrict(@Nonnull GenericRecord record, @Nonnull Schema
schema) {
+ Row.Builder builder = Row.withSchema(schema);
+ org.apache.avro.Schema avroSchema = record.getSchema();
+
+ for (Schema.Field field : schema.getFields()) {
+ Object value = record.get(field.getName());
+ org.apache.avro.Schema fieldAvroSchema =
avroSchema.getField(field.getName()).schema();
+
+ if (value == null) {
+ builder.addValue(null);
+ } else {
+ builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema,
field.getType()));
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Strict conversion from AVRO to Beam, strict because it doesn't do
widening or narrowing during
+ * conversion.
+ *
+ * @param value {@link GenericRecord} or any nested value
+ * @param avroSchema schema for value
+ * @param fieldType target beam field type
+ * @return value converted for {@link Row}
+ */
+ @SuppressWarnings("unchecked")
+ public static Object convertAvroFieldStrict(
+ @Nonnull Object value,
+ @Nonnull org.apache.avro.Schema avroSchema,
+ @Nonnull Schema.FieldType fieldType) {
+
+ org.apache.avro.Schema unwrapped = unwrapNullableSchema(avroSchema);
+
+ switch (unwrapped.getType()) {
+ case FIXED:
+ return convertFixedStrict((GenericFixed) value, fieldType);
+
+ case BYTES:
+ return convertBytesStrict((ByteBuffer) value, fieldType);
+
+ case STRING:
+ return convertStringStrict((CharSequence) value, fieldType);
+
+ case INT:
+ return convertIntStrict((Integer) value, fieldType);
+
+ case LONG:
+ return convertLongStrict((Long) value, fieldType);
+
+ case FLOAT:
+ return convertFloatStrict((Float) value, fieldType);
+
+ case DOUBLE:
+ return convertDoubleStrict((Double) value, fieldType);
+
+ case BOOLEAN:
+ return convertBooleanStrict((Boolean) value, fieldType);
+
+ case RECORD:
+ return convertRecordStrict((GenericRecord) value, fieldType);
+
+ case ENUM:
+ return convertEnumStrict((GenericEnumSymbol) value, fieldType);
+
+ case ARRAY:
+ return convertArrayStrict((List<Object>) value,
unwrapped.getElementType(), fieldType);
+
+ case MAP:
+ return convertMapStrict(
+ (Map<CharSequence, Object>) value, unwrapped.getValueType(),
fieldType);
+
+ case UNION:
+ throw new RuntimeException("Can't convert 'union', only nullable
fields are supported");
+
+ case NULL:
+ throw new RuntimeException("Can't convert 'null' to non-nullable
field");
+
+ default:
+ throw new AssertionError("Unexpected AVRO Schema.Type: " +
unwrapped.getType());
+ }
+ }
+
+ private static org.apache.avro.Schema
unwrapNullableSchema(org.apache.avro.Schema avroSchema) {
+ if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
+ org.apache.avro.Schema nullSchema =
+ org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL);
Review comment:
I was about to change it to be constant, but then I realized that AVRO
schemas aren't really immutable, so I would rather be safe here. WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 157445)
Time Spent: 3h 50m (was: 3h 40m)
> Add AvroIO.readRows
> -------------------
>
> Key: BEAM-5807
> URL: https://issues.apache.org/jira/browse/BEAM-5807
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Gleb Kanterov
> Assignee: Gleb Kanterov
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> # Motivation
> At the moment the only way to read AVRO is through code generation with
> avro-compiler and JavaBeanSchema. It makes it not possible to write
> transforms that can work with dynamic schemas. AVRO has generic data type
> called GenericRecord, reading is implemented in AvroIO.
> readGenericRecords. There is a code to convert GenericRecord to Row shipped
> as a part of BigQueryIO. However, it doesn't support all types and nested
> records.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)