[ 
https://issues.apache.org/jira/browse/BEAM-5807?focusedWorklogId=158213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158213
 ]

ASF GitHub Bot logged work on BEAM-5807:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Oct/18 16:19
            Start Date: 24/Oct/18 16:19
    Worklog Time Spent: 10m 
      Work Description: akedin closed pull request #6777: [BEAM-5807] 
Conversion from AVRO records to rows
URL: https://github.com/apache/beam/pull/6777
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4a93e1b54cf..463d2f13a20 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -261,8 +261,8 @@ class BeamModulePlugin implements Plugin<Project> {
         return
       }
 
-      mavenLocal()
       mavenCentral()
+      mavenLocal()
       jcenter()
 
       // Spring for resolving pentaho dependency.
@@ -349,6 +349,7 @@ class BeamModulePlugin implements Plugin<Project> {
         apex_engine                                 : 
"org.apache.apex:apex-engine:$apex_core_version",
         args4j                                      : "args4j:args4j:2.33",
         avro                                        : 
"org.apache.avro:avro:1.8.2",
+        avro_tests                                  : 
"org.apache.avro:avro:1.8.2:tests",
         bigdataoss_gcsio                            : 
"com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
         bigdataoss_util                             : 
"com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
         bigtable_client_core                        : 
"com.google.cloud.bigtable:bigtable-client-core:$bigtable_version",
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 3fb2d55c802..2b976a285ef 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -75,4 +75,6 @@ dependencies {
   shadowTest library.java.hamcrest_core
   shadowTest library.java.hamcrest_library
   shadowTest "com.esotericsoftware.kryo:kryo:2.21"
+  shadowTest library.java.quickcheck_core
+  shadowTest library.java.avro_tests
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
new file mode 100644
index 00000000000..b0a76976d15
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/** Utils to convert AVRO records to Beam rows. */
+@Experimental(Experimental.Kind.SCHEMAS)
+public class AvroUtils {
+  private AvroUtils() {}
+
+  /**
+   * Converts AVRO schema to Beam row schema.
+   *
+   * @param schema schema of type RECORD
+   */
+  public static Schema toSchema(@Nonnull org.apache.avro.Schema schema) {
+    Schema.Builder builder = Schema.builder();
+
+    for (org.apache.avro.Schema.Field field : schema.getFields()) {
+      org.apache.avro.Schema unwrapped = unwrapNullableSchema(field.schema());
+
+      if (!unwrapped.equals(field.schema())) {
+        builder.addNullableField(field.name(), toFieldType(unwrapped));
+      } else {
+        builder.addField(field.name(), toFieldType(unwrapped));
+      }
+    }
+
+    return builder.build();
+  }
+
+  /** Converts AVRO schema to Beam field. */
+  public static Schema.FieldType toFieldType(@Nonnull org.apache.avro.Schema 
avroSchema) {
+    switch (avroSchema.getType()) {
+      case RECORD:
+        return Schema.FieldType.row(toSchema(avroSchema));
+
+      case ENUM:
+        return Schema.FieldType.STRING;
+
+      case ARRAY:
+        Schema.FieldType elementType = 
toFieldType(avroSchema.getElementType());
+        return Schema.FieldType.array(elementType);
+
+      case MAP:
+        return Schema.FieldType.map(
+            Schema.FieldType.STRING, toFieldType(avroSchema.getValueType()));
+
+      case FIXED:
+        return Schema.FieldType.BYTES;
+
+      case STRING:
+        return Schema.FieldType.STRING;
+
+      case BYTES:
+        return Schema.FieldType.BYTES;
+
+      case INT:
+        return Schema.FieldType.INT32;
+
+      case LONG:
+        return Schema.FieldType.INT64;
+
+      case FLOAT:
+        return Schema.FieldType.FLOAT;
+
+      case DOUBLE:
+        return Schema.FieldType.DOUBLE;
+
+      case BOOLEAN:
+        return Schema.FieldType.BOOLEAN;
+
+      case UNION:
+        throw new RuntimeException("Can't convert 'union' to FieldType");
+
+      case NULL:
+        throw new RuntimeException("Can't convert 'null' to FieldType");
+
+      default:
+        throw new AssertionError("Unexpected AVRO Schema.Type: " + 
avroSchema.getType());
+    }
+  }
+
+  /**
+   * Strict conversion from AVRO to Beam, strict because it doesn't do 
widening or narrowing during
+   * conversion.
+   */
+  public static Row toRowStrict(@Nonnull GenericRecord record, @Nonnull Schema 
schema) {
+    Row.Builder builder = Row.withSchema(schema);
+    org.apache.avro.Schema avroSchema = record.getSchema();
+
+    for (Schema.Field field : schema.getFields()) {
+      Object value = record.get(field.getName());
+      org.apache.avro.Schema fieldAvroSchema = 
avroSchema.getField(field.getName()).schema();
+
+      if (value == null) {
+        builder.addValue(null);
+      } else {
+        builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, 
field.getType()));
+      }
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Strict conversion from AVRO to Beam, strict because it doesn't do 
widening or narrowing during
+   * conversion.
+   *
+   * @param value {@link GenericRecord} or any nested value
+   * @param avroSchema schema for value
+   * @param fieldType target beam field type
+   * @return value converted for {@link Row}
+   */
+  @SuppressWarnings("unchecked")
+  public static Object convertAvroFieldStrict(
+      @Nonnull Object value,
+      @Nonnull org.apache.avro.Schema avroSchema,
+      @Nonnull Schema.FieldType fieldType) {
+
+    org.apache.avro.Schema unwrapped = unwrapNullableSchema(avroSchema);
+
+    switch (unwrapped.getType()) {
+      case FIXED:
+        return convertFixedStrict((GenericFixed) value, fieldType);
+
+      case BYTES:
+        return convertBytesStrict((ByteBuffer) value, fieldType);
+
+      case STRING:
+        return convertStringStrict((CharSequence) value, fieldType);
+
+      case INT:
+        return convertIntStrict((Integer) value, fieldType);
+
+      case LONG:
+        return convertLongStrict((Long) value, fieldType);
+
+      case FLOAT:
+        return convertFloatStrict((Float) value, fieldType);
+
+      case DOUBLE:
+        return convertDoubleStrict((Double) value, fieldType);
+
+      case BOOLEAN:
+        return convertBooleanStrict((Boolean) value, fieldType);
+
+      case RECORD:
+        return convertRecordStrict((GenericRecord) value, fieldType);
+
+      case ENUM:
+        return convertEnumStrict((GenericEnumSymbol) value, fieldType);
+
+      case ARRAY:
+        return convertArrayStrict((List<Object>) value, 
unwrapped.getElementType(), fieldType);
+
+      case MAP:
+        return convertMapStrict(
+            (Map<CharSequence, Object>) value, unwrapped.getValueType(), 
fieldType);
+
+      case UNION:
+        throw new IllegalArgumentException(
+            "Can't convert 'union', only nullable fields are supported");
+
+      case NULL:
+        throw new IllegalArgumentException("Can't convert 'null' to 
non-nullable field");
+
+      default:
+        throw new AssertionError("Unexpected AVRO Schema.Type: " + 
unwrapped.getType());
+    }
+  }
+
+  @VisibleForTesting
+  static org.apache.avro.Schema unwrapNullableSchema(org.apache.avro.Schema 
avroSchema) {
+    if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
+      List<org.apache.avro.Schema> types = avroSchema.getTypes();
+
+      // optional fields in AVRO have form of:
+      // {"name": "foo", "type": ["null", "something"]}
+
+      // don't need recursion because nested unions aren't supported in AVRO
+      List<org.apache.avro.Schema> nonNullTypes =
+          types
+              .stream()
+              .filter(x -> x.getType() != org.apache.avro.Schema.Type.NULL)
+              .collect(Collectors.toList());
+
+      if (nonNullTypes.size() == types.size()) {
+        // union without `null`, keep as is
+        return avroSchema;
+      } else if (nonNullTypes.size() > 1) {
+        return org.apache.avro.Schema.createUnion(nonNullTypes);
+      } else if (nonNullTypes.size() == 1) {
+        return nonNullTypes.get(0);
+      } else { // nonNullTypes.size() == 0
+        return avroSchema;
+      }
+    }
+
+    return avroSchema;
+  }
+
+  private static Object convertRecordStrict(GenericRecord record, 
Schema.FieldType fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record");
+    return toRowStrict(record, fieldType.getRowSchema());
+  }
+
+  private static Object convertBytesStrict(ByteBuffer bb, Schema.FieldType 
fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.BYTES, "bytes");
+
+    byte[] bytes = new byte[bb.remaining()];
+    bb.get(bytes);
+    return bytes;
+  }
+
+  private static Object convertFixedStrict(GenericFixed fixed, 
Schema.FieldType fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.BYTES, "fixed");
+    return fixed.bytes().clone(); // clone because GenericFixed is mutable
+  }
+
+  private static Object convertStringStrict(CharSequence value, 
Schema.FieldType fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.STRING, "string");
+    return value.toString();
+  }
+
+  private static Object convertIntStrict(Integer value, Schema.FieldType 
fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.INT32, "int");
+    return value;
+  }
+
+  private static Object convertLongStrict(Long value, Schema.FieldType 
fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.INT64, "long");
+    return value;
+  }
+
+  private static Object convertFloatStrict(Float value, Schema.FieldType 
fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float");
+    return value;
+  }
+
+  private static Object convertDoubleStrict(Double value, Schema.FieldType 
fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.DOUBLE, "double");
+    return value;
+  }
+
+  private static Object convertBooleanStrict(Boolean value, Schema.FieldType 
fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.BOOLEAN, "boolean");
+    return value;
+  }
+
+  private static Object convertEnumStrict(GenericEnumSymbol value, 
Schema.FieldType fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.STRING, "enum");
+    return value.toString();
+  }
+
+  private static Object convertArrayStrict(
+      List<Object> values, org.apache.avro.Schema elemAvroSchema, 
Schema.FieldType fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.ARRAY, "array");
+
+    List<Object> ret = new ArrayList<>(values.size());
+    Schema.FieldType elemFieldType = fieldType.getCollectionElementType();
+
+    for (Object value : values) {
+      ret.add(convertAvroFieldStrict(value, elemAvroSchema, elemFieldType));
+    }
+
+    return ret;
+  }
+
+  private static Object convertMapStrict(
+      Map<CharSequence, Object> values,
+      org.apache.avro.Schema valueAvroSchema,
+      Schema.FieldType fieldType) {
+    checkTypeName(fieldType.getTypeName(), Schema.TypeName.MAP, "map");
+    checkNotNull(fieldType.getMapKeyType());
+    checkNotNull(fieldType.getMapValueType());
+
+    if (!fieldType.getMapKeyType().equals(Schema.FieldType.STRING)) {
+      throw new IllegalArgumentException(
+          "Can't convert 'string' map keys to " + fieldType.getMapKeyType());
+    }
+
+    Map<Object, Object> ret = new HashMap<>();
+
+    for (Map.Entry<CharSequence, Object> value : values.entrySet()) {
+      ret.put(
+          convertStringStrict(value.getKey(), fieldType.getMapKeyType()),
+          convertAvroFieldStrict(value.getValue(), valueAvroSchema, 
fieldType.getMapValueType()));
+    }
+
+    return ret;
+  }
+
+  private static void checkTypeName(Schema.TypeName got, Schema.TypeName 
expected, String label) {
+    checkArgument(
+        got.equals(expected),
+        "Can't convert '" + label + "' to " + got + ", expected: " + expected);
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 5094faa31e4..2ac32385ba5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -381,7 +381,7 @@ public static Builder withSchema(Schema schema) {
       this.schema = schema;
     }
 
-    public Builder addValue(Object values) {
+    public Builder addValue(@Nullable Object values) {
       this.values.add(values);
       return this;
     }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroGenerators.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroGenerators.java
new file mode 100644
index 00000000000..c2757e1c01e
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroGenerators.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ObjectArrays;
+import com.pholser.junit.quickcheck.generator.GenerationStatus;
+import com.pholser.junit.quickcheck.generator.Generator;
+import com.pholser.junit.quickcheck.random.SourceOfRandomness;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+
+/** QuickCheck generators for AVRO. */
+class AvroGenerators {
+
+  /** Generates arbitrary AVRO schemas. */
+  public static class SchemaGenerator extends BaseSchemaGenerator {
+
+    public static final SchemaGenerator INSTANCE = new SchemaGenerator();
+
+    private static final ImmutableList<Schema.Type> PRIMITIVE_TYPES =
+        ImmutableList.of(
+            Schema.Type.STRING,
+            Schema.Type.BYTES,
+            Schema.Type.INT,
+            Schema.Type.LONG,
+            Schema.Type.FLOAT,
+            Schema.Type.DOUBLE,
+            Schema.Type.BOOLEAN);
+
+    private static final ImmutableList<Schema.Type> ALL_TYPES =
+        ImmutableList.<Schema.Type>builder()
+            .addAll(PRIMITIVE_TYPES)
+            .add(Schema.Type.FIXED)
+            .add(Schema.Type.ENUM)
+            .add(Schema.Type.RECORD)
+            .add(Schema.Type.ARRAY)
+            .add(Schema.Type.MAP)
+            .add(Schema.Type.UNION)
+            .add(Schema.Type.ARRAY)
+            .build();
+
+    private static final int MAX_NESTING = 10;
+
+    @Override
+    public Schema generate(SourceOfRandomness random, GenerationStatus status) 
{
+      Schema.Type type;
+
+      if (nesting(status) >= MAX_NESTING) {
+        type = random.choose(PRIMITIVE_TYPES);
+      } else {
+        type = random.choose(ALL_TYPES);
+      }
+
+      if (PRIMITIVE_TYPES.contains(type)) {
+        return Schema.create(type);
+      } else {
+        nestingInc(status);
+
+        if (type == Schema.Type.FIXED) {
+          int size = random.choose(Arrays.asList(1, 5, 12));
+          return Schema.createFixed("fixed_" + branch(status), "", "", size);
+        } else if (type == Schema.Type.UNION) {
+          // only nullable fields, everything else isn't supported in row 
conversion code
+          return UnionSchemaGenerator.INSTANCE.generate(random, status);
+        } else if (type == Schema.Type.ENUM) {
+          return EnumSchemaGenerator.INSTANCE.generate(random, status);
+        } else if (type == Schema.Type.RECORD) {
+          return RecordSchemaGenerator.INSTANCE.generate(random, status);
+        } else if (type == Schema.Type.MAP) {
+          return Schema.createMap(generate(random, status));
+        } else if (type == Schema.Type.ARRAY) {
+          return Schema.createArray(generate(random, status));
+        } else {
+          throw new AssertionError("Unexpected AVRO type: " + type);
+        }
+      }
+    }
+  }
+
+  public static class RecordSchemaGenerator extends BaseSchemaGenerator {
+
+    public static final RecordSchemaGenerator INSTANCE = new 
RecordSchemaGenerator();
+
+    @Override
+    public Schema generate(SourceOfRandomness random, GenerationStatus status) 
{
+      List<Schema.Field> fields =
+          IntStream.range(0, random.nextInt(0, status.size()) + 1)
+              .mapToObj(
+                  i -> {
+                    // deterministically avoid collisions in record names
+                    branchPush(status, String.valueOf(i));
+                    Schema.Field field =
+                        createField(i, 
SchemaGenerator.INSTANCE.generate(random, status));
+                    branchPop(status);
+                    return field;
+                  })
+              .collect(Collectors.toList());
+
+      return Schema.createRecord("record_" + branch(status), "", "example", 
false, fields);
+    }
+
+    private Schema.Field createField(int i, Schema schema) {
+      return new Schema.Field("field_" + i, schema, null, (Object) null);
+    }
+  }
+
+  static class UnionSchemaGenerator extends BaseSchemaGenerator {
+
+    public static final UnionSchemaGenerator INSTANCE = new 
UnionSchemaGenerator();
+
+    @Override
+    public Schema generate(SourceOfRandomness random, GenerationStatus status) 
{
+      Map<String, Schema> schemaMap =
+          IntStream.range(0, random.nextInt(0, status.size()) + 1)
+              .mapToObj(
+                  i -> {
+                    // deterministically avoid collisions in record names
+                    branchPush(status, String.valueOf(i));
+                    Schema schema =
+                        SchemaGenerator.INSTANCE
+                            // nested unions aren't supported in AVRO
+                            .filter(x -> x.getType() != Schema.Type.UNION)
+                            .generate(random, status);
+                    branchPop(status);
+                    return schema;
+                  })
+              // AVRO requires uniqueness by full name
+              .collect(Collectors.toMap(Schema::getFullName, 
Function.identity(), (x, y) -> x));
+
+      List<Schema> schemas = new ArrayList<>(schemaMap.values());
+
+      if (random.nextBoolean()) {
+        org.apache.avro.Schema nullSchema = 
org.apache.avro.Schema.create(Schema.Type.NULL);
+        schemas.add(nullSchema);
+        Collections.shuffle(schemas, random.toJDKRandom());
+      }
+
+      return Schema.createUnion(schemas);
+    }
+  }
+
+  static class EnumSchemaGenerator extends BaseSchemaGenerator {
+
+    public static final EnumSchemaGenerator INSTANCE = new 
EnumSchemaGenerator();
+
+    private static final Schema FRUITS =
+        Schema.createEnum("Fruit", "", "example", Arrays.asList("banana", 
"apple", "pear"));
+
+    private static final Schema STATUS =
+        Schema.createEnum("Status", "", "example", Arrays.asList("OK", 
"ERROR", "WARNING"));
+
+    @Override
+    public Schema generate(final SourceOfRandomness random, final 
GenerationStatus status) {
+      return random.choose(Arrays.asList(FRUITS, STATUS));
+    }
+  }
+
+  abstract static class BaseSchemaGenerator extends Generator<Schema> {
+
+    private static final GenerationStatus.Key<Integer> NESTING_KEY =
+        new GenerationStatus.Key<>("nesting", Integer.class);
+
+    private static final GenerationStatus.Key<String[]> BRANCH_KEY =
+        new GenerationStatus.Key<>("branch", String[].class);
+
+    BaseSchemaGenerator() {
+      super(org.apache.avro.Schema.class);
+    }
+
+    void branchPush(GenerationStatus status, String value) {
+      String[] current = status.valueOf(BRANCH_KEY).orElse(new String[0]);
+      String[] next = ObjectArrays.concat(current, value);
+
+      status.setValue(BRANCH_KEY, next);
+    }
+
+    void branchPop(GenerationStatus status) {
+      String[] current = status.valueOf(BRANCH_KEY).orElse(new String[0]);
+      String[] next = Arrays.copyOf(current, current.length - 1);
+
+      status.setValue(BRANCH_KEY, next);
+    }
+
+    String branch(GenerationStatus status) {
+      return Joiner.on("_").join(status.valueOf(BRANCH_KEY).orElse(new 
String[0]));
+    }
+
+    int nesting(GenerationStatus status) {
+      return status.valueOf(NESTING_KEY).orElse(0);
+    }
+
+    void nestingInc(GenerationStatus status) {
+      status.setValue(NESTING_KEY, nesting(status) + 1);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
new file mode 100644
index 00000000000..6e88505b0d3
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeThat;
+
+import com.google.common.collect.Lists;
+import com.pholser.junit.quickcheck.From;
+import com.pholser.junit.quickcheck.Property;
+import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.avro.RandomData;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/** Tests for conversion between AVRO records and Beam rows. */
+@RunWith(JUnitQuickcheck.class)
+public class AvroUtilsTest {
+
+  private static final org.apache.avro.Schema NULL_SCHEMA =
+      org.apache.avro.Schema.create(Type.NULL);
+
+  @Property(trials = 1000)
+  @SuppressWarnings("unchecked")
+  public void supportsAnyAvroSchema(
+      @From(RecordSchemaGenerator.class) org.apache.avro.Schema avroSchema) {
+    // not everything is possible to translate
+    assumeThat(avroSchema, 
not(containsField(AvroUtilsTest::hasArrayOrMapOfNullable)));
+    assumeThat(avroSchema, not(containsField(AvroUtilsTest::hasNonNullUnion)));
+
+    Schema schema = AvroUtils.toSchema(avroSchema);
+    Iterable iterable = new RandomData(avroSchema, 10);
+    List<GenericRecord> records = Lists.newArrayList((Iterable<GenericRecord>) 
iterable);
+
+    for (GenericRecord record : records) {
+      AvroUtils.toRowStrict(record, schema);
+    }
+  }
+
+  @Test
+  public void testUnwrapNullableSchema() {
+    org.apache.avro.Schema avroSchema =
+        org.apache.avro.Schema.createUnion(
+            org.apache.avro.Schema.create(Type.NULL), 
org.apache.avro.Schema.create(Type.STRING));
+
+    assertEquals(
+        org.apache.avro.Schema.create(Type.STRING), 
AvroUtils.unwrapNullableSchema(avroSchema));
+  }
+
+  @Test
+  public void testUnwrapNullableSchemaReordered() {
+    org.apache.avro.Schema avroSchema =
+        org.apache.avro.Schema.createUnion(
+            org.apache.avro.Schema.create(Type.STRING), 
org.apache.avro.Schema.create(Type.NULL));
+
+    assertEquals(
+        org.apache.avro.Schema.create(Type.STRING), 
AvroUtils.unwrapNullableSchema(avroSchema));
+  }
+
+  @Test
+  public void testUnwrapNullableSchemaToUnion() {
+    org.apache.avro.Schema avroSchema =
+        org.apache.avro.Schema.createUnion(
+            org.apache.avro.Schema.create(Type.STRING),
+            org.apache.avro.Schema.create(Type.LONG),
+            org.apache.avro.Schema.create(Type.NULL));
+
+    assertEquals(
+        org.apache.avro.Schema.createUnion(
+            org.apache.avro.Schema.create(Type.STRING), 
org.apache.avro.Schema.create(Type.LONG)),
+        AvroUtils.unwrapNullableSchema(avroSchema));
+  }
+
+  public static ContainsField containsField(Function<org.apache.avro.Schema, 
Boolean> predicate) {
+    return new ContainsField(predicate);
+  }
+
+  // doesn't work because Beam doesn't have unions, only nullable fields
+  public static boolean hasNonNullUnion(org.apache.avro.Schema schema) {
+    if (schema.getType() == Type.UNION) {
+      final List<org.apache.avro.Schema> types = schema.getTypes();
+
+      if (types.size() == 2) {
+        return !types.contains(NULL_SCHEMA);
+      } else {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  // doesn't work because Beam doesn't support arrays and maps of nullable 
types
+  public static boolean hasArrayOrMapOfNullable(org.apache.avro.Schema schema) 
{
+
+    if (schema.getType() == Type.ARRAY) {
+      org.apache.avro.Schema elementType = schema.getElementType();
+      if (elementType.getType() == Type.UNION) {
+        return elementType.getTypes().contains(NULL_SCHEMA);
+      }
+    }
+
+    if (schema.getType() == Type.MAP) {
+      org.apache.avro.Schema valueType = schema.getValueType();
+      if (valueType.getType() == Type.UNION) {
+        return valueType.getTypes().contains(NULL_SCHEMA);
+      }
+    }
+
+    return false;
+  }
+
+  static class ContainsField extends BaseMatcher<org.apache.avro.Schema> {
+
+    private final Function<org.apache.avro.Schema, Boolean> predicate;
+
+    ContainsField(final Function<org.apache.avro.Schema, Boolean> predicate) {
+      this.predicate = predicate;
+    }
+
+    @Override
+    public boolean matches(final Object item0) {
+      if (!(item0 instanceof org.apache.avro.Schema)) {
+        return false;
+      }
+
+      org.apache.avro.Schema item = (org.apache.avro.Schema) item0;
+
+      if (predicate.apply(item)) {
+        return true;
+      }
+
+      switch (item.getType()) {
+        case RECORD:
+          return item.getFields().stream().anyMatch(x -> matches(x.schema()));
+
+        case UNION:
+          return item.getTypes().stream().anyMatch(this::matches);
+
+        case ARRAY:
+          return matches(item.getElementType());
+
+        case MAP:
+          return matches(item.getValueType());
+
+        default:
+          return false;
+      }
+    }
+
+    @Override
+    public void describeTo(final Description description) {}
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 158213)
    Time Spent: 5h 20m  (was: 5h 10m)

> Add AvroIO.readRows
> -------------------
>
>                 Key: BEAM-5807
>                 URL: https://issues.apache.org/jira/browse/BEAM-5807
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Gleb Kanterov
>            Assignee: Gleb Kanterov
>            Priority: Major
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> # Motivation
> At the moment the only way to read AVRO is through code generation with 
> avro-compiler and JavaBeanSchema. It makes it not possible to write 
> transforms that can work with dynamic schemas. AVRO has generic data type 
> called GenericRecord, reading is implemented in AvroIO.
> readGenericRecords. There is a code to convert GenericRecord to Row shipped 
> as a part of BigQueryIO. However, it doesn't support all types and nested 
> records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to