paul-rogers commented on a change in pull request #1951: DRILL-7454: Convert 
Avro to EVF
URL: https://github.com/apache/drill/pull/1951#discussion_r363133713
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java
 ##########
 @@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.DictBuilder;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.RepeatedListBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class that provides methods to interact with Avro schema.
+ */
+public class AvroSchemaUtil {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AvroSchemaUtil.class);
+
+  public static final String AVRO_LOGICAL_TYPE_PROPERTY = "avro_logical_type";
+
+  public static final String DECIMAL_LOGICAL_TYPE = "decimal";
+  public static final String TIMESTAMP_MICROS_LOGICAL_TYPE = 
"timestamp-micros";
+  public static final String TIMESTAMP_MILLIS_LOGICAL_TYPE = 
"timestamp-millis";
+  public static final String DATE_LOGICAL_TYPE = "date";
+  public static final String TIME_MICROS_LOGICAL_TYPE = "time-micros";
+  public static final String TIME_MILLIS_LOGICAL_TYPE = "time-millis";
+  public static final String DURATION_LOGICAL_TYPE = "duration";
+
+  /**
+   * Converts Avro schema into Drill metadata description of the schema.
+   *
+   * @param schema Avro schema
+   * @return metadata description of the schema
+   * @throws UserException if schema contains unsupported types
+   */
+  public static TupleMetadata convert(Schema schema) {
+    return SchemaConverter.INSTANCE.convert(schema);
+  }
+
+  /**
+   * Avro represents nullable type as union of null and another schema: 
["null", "some-type"].
+   * This method extracts non-nullable schema for given union schema.
+   *
+   * @param schema Avro schema
+   * @param columnName column name
+   * @return non-nullable Avro schema
+   * @throws UserException if given schema is not a union or represents 
complex union
+   */
+  public static Schema extractSchemaFromNullable(Schema schema, String 
columnName) {
+    if (!schema.isUnion()) {
+      throw UserException.validationError()
+        .message("Expected union type, but received: %s", schema.getType())
+        .addContext("Column", columnName)
+        .build(logger);
+    }
+    List<Schema> unionSchemas = schema.getTypes();
+
+    // exclude all schemas with null type
+    List<Schema> nonNullSchemas = unionSchemas.stream()
+      .filter(unionSchema -> !Schema.Type.NULL.equals(unionSchema.getType()))
+      .collect(Collectors.toList());
+
+    // if original schema has two elements and only one non-nullable schema, 
this is simple nullable type
+    if (unionSchemas.size() == 2 && nonNullSchemas.size() == 1) {
+      return nonNullSchemas.get(0);
+    } else {
+      return throwUnsupportedErrorForType("complex union", columnName);
+    }
+  }
+
+  private static <T> T throwUnsupportedErrorForType(String type, String 
columnName) {
+    throw UserException.unsupportedError()
+      .message("'%s' type is not supported", type)
+      .addContext("Column", columnName)
+      .build(logger);
+  }
+
+  /**
+   * Class is responsible for converting Avro schema into Drill metadata 
description of the schema.
+   * It does not hold state and thus is thread-safe.
+   */
+  private static class SchemaConverter {
+
+    private static final SchemaConverter INSTANCE = new SchemaConverter();
+
+    TupleMetadata convert(Schema schema) {
+      /*
+        Avro allows to reference types by name, sometimes reference can be 
done to the type under construction.
+        For example, a linked-list of 64-bit values:
+        {
+          "type": "record",
+          "name": "LongList",
+          "fields" : [
+             {"name": "value", "type": "long"},             // each element 
has a long
+             {"name": "next", "type": ["null", "LongList"]} // optional next 
element
+           ]
+        }
+
+        Since we cannot build record type which is not constructed yet, when 
such situation is detected,
+        record type is set to Drill Map without columns, columns will be 
detected when reading actual data.
+
+        `typeNamesUnderConstruction` is a holder to store record type names 
under construction to detect
+         reference to the types which are not yet constructed.
+       */
+      Set<String> typeNamesUnderConstruction = new HashSet<>();
+      TupleSchema tupleSchema = new TupleSchema();
+
+      // add current record type to the set of types under construction
+      typeNamesUnderConstruction.add(schema.getFullName());
+
+      List<Schema.Field> fields = schema.getFields();
+      fields.stream()
+        .map(field -> convert(field, typeNamesUnderConstruction))
+        .forEach(tupleSchema::add);
+      return tupleSchema;
+    }
+
+    private ColumnMetadata convert(Schema.Field field, Set<String> 
typeNamesUnderConstruction) {
+      Schema fieldSchema = field.schema();
+      return defineColumn(field.name(), fieldSchema, 
TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction);
+    }
+
+    private ColumnMetadata defineColumn(String name, Schema fieldSchema,
 
 Review comment:
   Are there any cases where two distinct Avro types or variations map to a 
single Drill type? If so, you'd later want to create distinct 
`ColumnConverter`s for each variation. This might be a handy place to use 
`ColumnMetadata` properties to capture those nuances to be used later when 
creating the column converter.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to