This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch cleanup-plugins-dependency
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7058021f65b9d74a2c4c3d9b83a990b588f9fd72
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jan 10 15:15:42 2020 -0800

    make pinot-parquet don't depend on pinot-avro
---
 .../pinot-input-format/pinot-parquet/pom.xml       |   5 -
 .../inputformat/parquet/ParquetRecordReader.java   |   3 +-
 .../plugin/inputformat/parquet/ParquetUtils.java   | 196 +++++++++++++++++++++
 .../parquet/ParquetRecordReaderTest.java           |   3 +-
 4 files changed, 198 insertions(+), 9 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml 
b/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml
index f3d06d9..401cf4a 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml
@@ -44,11 +44,6 @@
   </build>
   <dependencies>
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-avro</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
     </dependency>
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 2d05d6f..a28efce 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -25,7 +25,6 @@ import javax.annotation.Nullable;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -49,7 +48,7 @@ public class ParquetRecordReader implements RecordReader {
       throws IOException {
     _dataFilePath = new Path(dataFile.getAbsolutePath());
     _schema = schema;
-    AvroUtils.validateSchema(_schema, 
ParquetUtils.getParquetSchema(_dataFilePath));
+    ParquetUtils.validateSchema(_schema, 
ParquetUtils.getParquetSchema(_dataFilePath));
 
     _fieldSpecs = RecordReaderUtils.extractFieldSpecs(_schema);
     _reader = ParquetUtils.getParquetReader(_dataFilePath);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index 69e7742..16a5acc 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -18,10 +18,18 @@
  */
 package org.apache.pinot.plugin.inputformat.parquet;
 
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
+import java.util.zip.GZIPInputStream;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -34,10 +42,13 @@ import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ParquetUtils {
   public static final String DEFAULT_FS = "file:///";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetUtils.class);
 
   /**
    * Returns a ParquetReader with the given path.
@@ -85,4 +96,189 @@ public class ParquetUtils {
     conf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
     return conf;
   }
+
+  /**
+   * Helper method to build Avro schema from Pinot schema.
+   *
+   * @param pinotSchema Pinot schema.
+   * @return Avro schema.
+   */
+  public static org.apache.avro.Schema 
getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema pinotSchema) {
+    SchemaBuilder.FieldAssembler<org.apache.avro.Schema> fieldAssembler = 
SchemaBuilder.record("record").fields();
+
+    for (FieldSpec fieldSpec : pinotSchema.getAllFieldSpecs()) {
+      FieldSpec.DataType dataType = fieldSpec.getDataType();
+      if (fieldSpec.isSingleValueField()) {
+        switch (dataType) {
+          case INT:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().intType().noDefault();
+            break;
+          case LONG:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().longType().noDefault();
+            break;
+          case FLOAT:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().floatType().noDefault();
+            break;
+          case DOUBLE:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().doubleType().noDefault();
+            break;
+          case STRING:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().stringType().noDefault();
+            break;
+          case BYTES:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().bytesType().noDefault();
+            break;
+          default:
+            throw new RuntimeException("Unsupported data type: " + dataType);
+        }
+      } else {
+        switch (dataType) {
+          case INT:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().array().items().intType().noDefault();
+            break;
+          case LONG:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().array().items().longType().noDefault();
+            break;
+          case FLOAT:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().array().items().floatType().noDefault();
+            break;
+          case DOUBLE:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().array().items().doubleType().noDefault();
+            break;
+          case STRING:
+            fieldAssembler = 
fieldAssembler.name(fieldSpec.getName()).type().array().items().stringType().noDefault();
+            break;
+          default:
+            throw new RuntimeException("Unsupported data type: " + dataType);
+        }
+      }
+    }
+
+    return fieldAssembler.endRecord();
+  }
+
+  /**
+   * Validates Pinot schema against the given Avro schema.
+   */
+  public static void validateSchema(org.apache.pinot.spi.data.Schema schema, 
org.apache.avro.Schema avroSchema) {
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      String fieldName = fieldSpec.getName();
+      Schema.Field avroField = avroSchema.getField(fieldName);
+      if (avroField == null) {
+        LOGGER.warn("Pinot field: {} does not exist in Avro Schema", 
fieldName);
+      } else {
+        boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
+        boolean isAvroFieldSingleValue = isSingleValueField(avroField);
+        if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
+          String errorMessage = "Pinot field: " + fieldName + " is " + 
(isPinotFieldSingleValue ? "Single" : "Multi")
+              + "-valued in Pinot schema but not in Avro schema";
+          LOGGER.error(errorMessage);
+          throw new IllegalStateException(errorMessage);
+        }
+
+        FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType();
+        FieldSpec.DataType avroFieldDataType = extractFieldDataType(avroField);
+        if (pinotFieldDataType != avroFieldDataType) {
+          LOGGER.warn("Pinot field: {} of type: {} mismatches with 
corresponding field in Avro Schema of type: {}",
+              fieldName, pinotFieldDataType, avroFieldDataType);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the Avro file reader for the given file.
+   */
+  public static DataFileStream<GenericRecord> getAvroReader(File avroFile)
+      throws IOException {
+    if (avroFile.getName().endsWith(".gz")) {
+      return new DataFileStream<>(new GZIPInputStream(new 
FileInputStream(avroFile)), new GenericDatumReader<>());
+    } else {
+      return new DataFileStream<>(new FileInputStream(avroFile), new 
GenericDatumReader<>());
+    }
+  }
+
+  /**
+   * Return whether the Avro field is a single-value field.
+   */
+  public static boolean isSingleValueField(Schema.Field field) {
+    try {
+      org.apache.avro.Schema fieldSchema = 
extractSupportedSchema(field.schema());
+      return fieldSchema.getType() != org.apache.avro.Schema.Type.ARRAY;
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while extracting non-null 
schema from field: " + field.name(), e);
+    }
+  }
+
+  /**
+   * Extract the data type stored in Pinot for the given Avro field.
+   */
+  public static FieldSpec.DataType extractFieldDataType(Schema.Field field) {
+    try {
+      org.apache.avro.Schema fieldSchema = 
extractSupportedSchema(field.schema());
+      org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
+      if (fieldType == org.apache.avro.Schema.Type.ARRAY) {
+        return 
valueOf(extractSupportedSchema(fieldSchema.getElementType()).getType());
+      } else {
+        return valueOf(fieldType);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while extracting data type 
from field: " + field.name(), e);
+    }
+  }
+
+  /**
+   * Helper method to extract the supported Avro schema from the given Avro 
field schema.
+   * <p>Currently we support INT/LONG/FLOAT/DOUBLE/BOOLEAN/STRING/ENUM
+   */
+  private static org.apache.avro.Schema 
extractSupportedSchema(org.apache.avro.Schema fieldSchema) {
+    org.apache.avro.Schema.Type fieldType = fieldSchema.getType();
+    if (fieldType == org.apache.avro.Schema.Type.UNION) {
+      org.apache.avro.Schema nonNullSchema = null;
+      for (org.apache.avro.Schema childFieldSchema : fieldSchema.getTypes()) {
+        if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) {
+          if (nonNullSchema == null) {
+            nonNullSchema = childFieldSchema;
+          } else {
+            throw new IllegalStateException("More than one non-null schema in 
UNION schema");
+          }
+        }
+      }
+      if (nonNullSchema != null) {
+        return extractSupportedSchema(nonNullSchema);
+      } else {
+        throw new IllegalStateException("Cannot find non-null schema in UNION 
schema");
+      }
+    } else if (fieldType == org.apache.avro.Schema.Type.RECORD) {
+      List<Schema.Field> recordFields = fieldSchema.getFields();
+      Preconditions.checkState(recordFields.size() == 1, "Not one field in the 
RECORD schema");
+      return extractSupportedSchema(recordFields.get(0).schema());
+    } else {
+      return fieldSchema;
+    }
+  }
+
+  /**
+   * Returns the data type stored in Pinot that is associated with the given 
Avro type.
+   */
+  public static FieldSpec.DataType valueOf(org.apache.avro.Schema.Type 
avroType) {
+    switch (avroType) {
+      case INT:
+        return FieldSpec.DataType.INT;
+      case LONG:
+        return FieldSpec.DataType.LONG;
+      case FLOAT:
+        return FieldSpec.DataType.FLOAT;
+      case DOUBLE:
+        return FieldSpec.DataType.DOUBLE;
+      case BOOLEAN:
+      case STRING:
+      case ENUM:
+        return FieldSpec.DataType.STRING;
+      case BYTES:
+        return FieldSpec.DataType.BYTES;
+      default:
+        throw new UnsupportedOperationException("Unsupported Avro type: " + 
avroType);
+    }
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 9e5b0e8..4761e46 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
 import org.apache.pinot.spi.data.readers.RecordReader;
@@ -47,7 +46,7 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
   @Override
   protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
       throws Exception {
-    Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(getPinotSchema());
+    Schema schema = 
ParquetUtils.getAvroSchemaFromPinotSchema(getPinotSchema());
     List<GenericRecord> records = new ArrayList<>();
     for (Map<String, Object> r : recordsToWrite) {
       GenericRecord record = new GenericData.Record(schema);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to