This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch cleanup-plugins-dependency in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7058021f65b9d74a2c4c3d9b83a990b588f9fd72 Author: Xiang Fu <[email protected]> AuthorDate: Fri Jan 10 15:15:42 2020 -0800 make pinot-parquet don't depend on pinot-avro --- .../pinot-input-format/pinot-parquet/pom.xml | 5 - .../inputformat/parquet/ParquetRecordReader.java | 3 +- .../plugin/inputformat/parquet/ParquetUtils.java | 196 +++++++++++++++++++++ .../parquet/ParquetRecordReaderTest.java | 3 +- 4 files changed, 198 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml b/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml index f3d06d9..401cf4a 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml @@ -44,11 +44,6 @@ </build> <dependencies> <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-avro</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> </dependency> diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java index 2d05d6f..a28efce 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java @@ -25,7 +25,6 @@ import javax.annotation.Nullable; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; -import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -49,7 +48,7 @@ public class ParquetRecordReader implements RecordReader { throws IOException { _dataFilePath = new Path(dataFile.getAbsolutePath()); _schema = schema; - AvroUtils.validateSchema(_schema, ParquetUtils.getParquetSchema(_dataFilePath)); + ParquetUtils.validateSchema(_schema, ParquetUtils.getParquetSchema(_dataFilePath)); _fieldSpecs = RecordReaderUtils.extractFieldSpecs(_schema); _reader = ParquetUtils.getParquetReader(_dataFilePath); diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java index 69e7742..16a5acc 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java @@ -18,10 +18,18 @@ */ package org.apache.pinot.plugin.inputformat.parquet; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.zip.GZIPInputStream; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -34,10 +42,13 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParquetUtils { public static final String DEFAULT_FS = "file:///"; + private static final Logger LOGGER = LoggerFactory.getLogger(ParquetUtils.class); /** * Returns a ParquetReader with the given path. @@ -85,4 +96,189 @@ public class ParquetUtils { conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); return conf; } + + /** + * Helper method to build Avro schema from Pinot schema. + * + * @param pinotSchema Pinot schema. + * @return Avro schema. + */ + public static org.apache.avro.Schema getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema pinotSchema) { + SchemaBuilder.FieldAssembler<org.apache.avro.Schema> fieldAssembler = SchemaBuilder.record("record").fields(); + + for (FieldSpec fieldSpec : pinotSchema.getAllFieldSpecs()) { + FieldSpec.DataType dataType = fieldSpec.getDataType(); + if (fieldSpec.isSingleValueField()) { + switch (dataType) { + case INT: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().intType().noDefault(); + break; + case LONG: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().longType().noDefault(); + break; + case FLOAT: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().floatType().noDefault(); + break; + case DOUBLE: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().doubleType().noDefault(); + break; + case STRING: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().stringType().noDefault(); + break; + case BYTES: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().bytesType().noDefault(); + break; + default: + throw new RuntimeException("Unsupported data type: " + dataType); + } + } else { + switch (dataType) { + case INT: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().array().items().intType().noDefault(); + break; + case LONG: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().array().items().longType().noDefault(); + break; + case FLOAT: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().array().items().floatType().noDefault(); + break; + case DOUBLE: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().array().items().doubleType().noDefault(); + break; + case STRING: + fieldAssembler = fieldAssembler.name(fieldSpec.getName()).type().array().items().stringType().noDefault(); + break; + default: + throw new RuntimeException("Unsupported data type: " + dataType); + } + } + } + + return fieldAssembler.endRecord(); + } + + /** + * Validates Pinot schema against the given Avro schema. + */ + public static void validateSchema(org.apache.pinot.spi.data.Schema schema, org.apache.avro.Schema avroSchema) { + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + String fieldName = fieldSpec.getName(); + Schema.Field avroField = avroSchema.getField(fieldName); + if (avroField == null) { + LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName); + } else { + boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField(); + boolean isAvroFieldSingleValue = isSingleValueField(avroField); + if (isPinotFieldSingleValue != isAvroFieldSingleValue) { + String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi") + + "-valued in Pinot schema but not in Avro schema"; + LOGGER.error(errorMessage); + throw new IllegalStateException(errorMessage); + } + + FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType(); + FieldSpec.DataType avroFieldDataType = extractFieldDataType(avroField); + if (pinotFieldDataType != avroFieldDataType) { + LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}", + fieldName, pinotFieldDataType, avroFieldDataType); + } + } + } + } + + /** + * Get the Avro file reader for the given file. + */ + public static DataFileStream<GenericRecord> getAvroReader(File avroFile) + throws IOException { + if (avroFile.getName().endsWith(".gz")) { + return new DataFileStream<>(new GZIPInputStream(new FileInputStream(avroFile)), new GenericDatumReader<>()); + } else { + return new DataFileStream<>(new FileInputStream(avroFile), new GenericDatumReader<>()); + } + } + + /** + * Return whether the Avro field is a single-value field. + */ + public static boolean isSingleValueField(Schema.Field field) { + try { + org.apache.avro.Schema fieldSchema = extractSupportedSchema(field.schema()); + return fieldSchema.getType() != org.apache.avro.Schema.Type.ARRAY; + } catch (Exception e) { + throw new RuntimeException("Caught exception while extracting non-null schema from field: " + field.name(), e); + } + } + + /** + * Extract the data type stored in Pinot for the given Avro field. + */ + public static FieldSpec.DataType extractFieldDataType(Schema.Field field) { + try { + org.apache.avro.Schema fieldSchema = extractSupportedSchema(field.schema()); + org.apache.avro.Schema.Type fieldType = fieldSchema.getType(); + if (fieldType == org.apache.avro.Schema.Type.ARRAY) { + return valueOf(extractSupportedSchema(fieldSchema.getElementType()).getType()); + } else { + return valueOf(fieldType); + } + } catch (Exception e) { + throw new RuntimeException("Caught exception while extracting data type from field: " + field.name(), e); + } + } + + /** + * Helper method to extract the supported Avro schema from the given Avro field schema. + * <p>Currently we support INT/LONG/FLOAT/DOUBLE/BOOLEAN/STRING/ENUM + */ + private static org.apache.avro.Schema extractSupportedSchema(org.apache.avro.Schema fieldSchema) { + org.apache.avro.Schema.Type fieldType = fieldSchema.getType(); + if (fieldType == org.apache.avro.Schema.Type.UNION) { + org.apache.avro.Schema nonNullSchema = null; + for (org.apache.avro.Schema childFieldSchema : fieldSchema.getTypes()) { + if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) { + if (nonNullSchema == null) { + nonNullSchema = childFieldSchema; + } else { + throw new IllegalStateException("More than one non-null schema in UNION schema"); + } + } + } + if (nonNullSchema != null) { + return extractSupportedSchema(nonNullSchema); + } else { + throw new IllegalStateException("Cannot find non-null schema in UNION schema"); + } + } else if (fieldType == org.apache.avro.Schema.Type.RECORD) { + List<Schema.Field> recordFields = fieldSchema.getFields(); + Preconditions.checkState(recordFields.size() == 1, "Not one field in the RECORD schema"); + return extractSupportedSchema(recordFields.get(0).schema()); + } else { + return fieldSchema; + } + } + + /** + * Returns the data type stored in Pinot that is associated with the given Avro type. + */ + public static FieldSpec.DataType valueOf(org.apache.avro.Schema.Type avroType) { + switch (avroType) { + case INT: + return FieldSpec.DataType.INT; + case LONG: + return FieldSpec.DataType.LONG; + case FLOAT: + return FieldSpec.DataType.FLOAT; + case DOUBLE: + return FieldSpec.DataType.DOUBLE; + case BOOLEAN: + case STRING: + case ENUM: + return FieldSpec.DataType.STRING; + case BYTES: + return FieldSpec.DataType.BYTES; + default: + throw new UnsupportedOperationException("Unsupported Avro type: " + avroType); + } + } } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java index 9e5b0e8..4761e46 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java @@ -27,7 +27,6 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; import org.apache.pinot.spi.data.readers.RecordReader; @@ -47,7 +46,7 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { @Override protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) throws Exception { - Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(getPinotSchema()); + Schema schema = ParquetUtils.getAvroSchemaFromPinotSchema(getPinotSchema()); List<GenericRecord> records = new ArrayList<>(); for (Map<String, Object> r : recordsToWrite) { GenericRecord record = new GenericData.Record(schema); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
