This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch revert-3852-parquet_reader in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit bb57c1053c2c2e96da14da8e8103f057b9b938c5 Author: Seunghyun Lee <[email protected]> AuthorDate: Wed Apr 3 14:26:13 2019 -0700 Revert "add support for parquet reader (#3852)" This reverts commit 69db49139e0acec1c76398c71420cb5832622703. --- pinot-common/pom.xml | 5 - .../apache/pinot/common/utils/ParquetUtils.java | 99 ------------------ pinot-core/pom.xml | 5 - .../pinot/core/data/readers/AvroRecordReader.java | 31 +++++- .../apache/pinot/core/data/readers/FileFormat.java | 2 +- .../core/data/readers/ParquetRecordReader.java | 116 --------------------- .../core/data/readers/RecordReaderFactory.java | 2 - .../java/org/apache/pinot/core/util/AvroUtils.java | 36 ------- .../core/data/readers/ParquetRecordReaderTest.java | 98 ----------------- .../pinot/hadoop/job/SegmentCreationJob.java | 2 +- .../hadoop/job/mapper/SegmentCreationMapper.java | 3 - pom.xml | 1 - 12 files changed, 32 insertions(+), 368 deletions(-) diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml index 2a5d180..55485a7 100644 --- a/pinot-common/pom.xml +++ b/pinot-common/pom.xml @@ -313,11 +313,6 @@ <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-server</artifactId> </dependency> - <dependency> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-avro</artifactId> - <version>${parquet.version}</version> - </dependency> </dependencies> <profiles> <profile> diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ParquetUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ParquetUtils.java deleted file mode 100644 index 234346d..0000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ParquetUtils.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.utils; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroParquetReader; -import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; - -import java.io.IOException; - - -public class ParquetUtils { - public static final String DEFAULT_FS = "file:///"; - - /** - * Get a ParquetReader with the given file. - * @param fileName the parquet file to read - * @return a ParquetReader - * @throws IOException - */ - public static ParquetReader<GenericRecord> getParquetReader(String fileName) - throws IOException { - Path dataFsPath = new Path(fileName); - return AvroParquetReader.<GenericRecord>builder(dataFsPath).disableCompatibility().withDataModel(GenericData.get()) - .withConf(getConfiguration()).build(); - } - - /** - * Read the parquet file schema - * @param fileName - * @return the parquet file schema - * @throws IOException - */ - public static Schema getParquetSchema(String fileName) - throws IOException { - Path dataFsPath = new Path(fileName); - ParquetMetadata footer = ParquetFileReader.readFooter(getConfiguration(), dataFsPath); - - String schemaString = footer.getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema"); - if (schemaString == null) { - // try the older property - schemaString = footer.getFileMetaData().getKeyValueMetaData().get("avro.schema"); - } - - if (schemaString != null) { - return new Schema.Parser().parse(schemaString); - } else { - return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema()); - } - } - - /** - * Get a ParquetWriter with the given file - * @param fileName - * @param schema - * @return a ParquetWriter - * @throws IOException - */ - public static ParquetWriter<GenericRecord> getParquetWriter(String fileName, Schema schema) - throws IOException { - Path dataFsPath = new Path(fileName); - return AvroParquetWriter.<GenericRecord>builder(dataFsPath).withSchema(schema).withConf(getConfiguration()).build(); - } - - private static Configuration getConfiguration() { - // The file path used in ParquetRecordReader is a local file path without prefix 'file:///', - // so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///' - // in case that user's hadoop conf overwrite this item - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", DEFAULT_FS); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - return conf; - } -} diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml index 416a91f..a82b8ee 100644 --- a/pinot-core/pom.xml +++ b/pinot-core/pom.xml @@ -208,11 +208,6 @@ <artifactId>equalsverifier</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.commons</groupId> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java index 4102108..c5ca4cc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java @@ -21,9 +21,11 @@ package org.apache.pinot.core.data.readers; import java.io.File; import java.io.IOException; import java.util.List; +import org.apache.avro.Schema.Field; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.FieldSpec.DataType; import org.apache.pinot.common.data.Schema; import org.apache.pinot.core.data.GenericRow; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; @@ -52,7 +54,7 @@ public class AvroRecordReader implements RecordReader { _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); _avroReader = AvroUtils.getAvroReader(dataFile); try { - AvroUtils.validateSchema(_schema, _avroReader.getSchema()); + validateSchema(); } catch (Exception e) { _avroReader.close(); throw e; @@ -64,6 +66,33 @@ public class AvroRecordReader implements RecordReader { } + private void validateSchema() { + org.apache.avro.Schema avroSchema = _avroReader.getSchema(); + for (FieldSpec fieldSpec : _fieldSpecs) { + String fieldName = fieldSpec.getName(); + Field avroField = avroSchema.getField(fieldName); + if (avroField == null) { + LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName); + } else { + boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField(); + boolean isAvroFieldSingleValue = AvroUtils.isSingleValueField(avroField); + if (isPinotFieldSingleValue != isAvroFieldSingleValue) { + String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi") + + "-valued in Pinot schema but not in Avro schema"; + LOGGER.error(errorMessage); + throw new IllegalStateException(errorMessage); + } + + DataType pinotFieldDataType = fieldSpec.getDataType(); + DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField); + if (pinotFieldDataType != avroFieldDataType) { + LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}", + fieldName, pinotFieldDataType, avroFieldDataType); + } + } + } + } + @Override public boolean hasNext() { return _avroReader.hasNext(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java index 43f8391..5f7120d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java @@ -19,5 +19,5 @@ package org.apache.pinot.core.data.readers; public enum FileFormat { - AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER + AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ParquetRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ParquetRecordReader.java deleted file mode 100644 index 6067540..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ParquetRecordReader.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.data.readers; - -import java.util.List; -import org.apache.avro.generic.GenericRecord; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.pinot.common.data.FieldSpec; -import org.apache.pinot.common.data.Schema; -import org.apache.pinot.common.utils.ParquetUtils; -import org.apache.pinot.core.data.GenericRow; -import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; -import org.apache.pinot.core.util.AvroUtils; - -import java.io.File; -import java.io.IOException; - - -/** - * Record reader for Parquet file. - */ -public class ParquetRecordReader implements RecordReader { - private final String _dataFilePath; - private final Schema _schema; - private final List<FieldSpec> _fieldSpecs; - - private ParquetReader<GenericRecord> _reader; - private GenericRecord _next; - private boolean _hasNext; - - public ParquetRecordReader(File dataFile, Schema schema) - throws IOException { - _dataFilePath = dataFile.getAbsolutePath(); - _schema = schema; - _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); - - _reader = ParquetUtils.getParquetReader(_dataFilePath); - advanceToNext(); - - AvroUtils.validateSchema(_schema, ParquetUtils.getParquetSchema(_dataFilePath)); - } - - @Override - public void init(SegmentGeneratorConfig segmentGeneratorConfig) { - - } - - @Override - public boolean hasNext() { - return _hasNext; - } - - @Override - public GenericRow next() - throws IOException { - return next(new GenericRow()); - } - - @Override - public GenericRow next(GenericRow reuse) - throws IOException { - for (FieldSpec fieldSpec : _fieldSpecs) { - String fieldName = fieldSpec.getName(); - Object value = _next.get(fieldName); - // Allow default value for non-time columns - if (value != null || fieldSpec.getFieldType() != FieldSpec.FieldType.TIME) { - reuse.putField(fieldName, RecordReaderUtils.convert(fieldSpec, value)); - } - } - advanceToNext(); - return reuse; - } - - @Override - public void rewind() - throws IOException { - _reader = ParquetUtils.getParquetReader(_dataFilePath); - advanceToNext(); - } - - @Override - public Schema getSchema() { - return _schema; - } - - @Override - public void close() - throws IOException { - _reader.close(); - } - - private void advanceToNext() { - try { - _next = _reader.read(); - _hasNext = (_next != null); - } catch (IOException e) { - throw new RuntimeException("Failed while reading parquet file: " + _dataFilePath, e); - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java index 740004f..7909905 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java @@ -70,8 +70,6 @@ public class RecordReaderFactory { // NOTE: PinotSegmentRecordReader does not support time conversion (field spec must match) case PINOT: return new PinotSegmentRecordReader(dataFile, schema, segmentGeneratorConfig.getColumnSortOrder()); - case PARQUET: - return new ParquetRecordReader(dataFile, schema); default: throw new UnsupportedOperationException("Unsupported input file format: " + fileFormat); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java index b11c252..a6b4dd5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -32,7 +31,6 @@ import javax.annotation.Nullable; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.common.data.DimensionFieldSpec; @@ -40,14 +38,9 @@ import org.apache.pinot.common.data.FieldSpec; import org.apache.pinot.common.data.MetricFieldSpec; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.data.TimeFieldSpec; -import org.apache.pinot.core.data.GenericRow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class AvroUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(AvroUtils.class); - private AvroUtils() { } @@ -275,33 +268,4 @@ public class AvroUtils { return fieldSchema; } } - - /** - * Valid table schema with avro schema - */ - public static void validateSchema(Schema schema, org.apache.avro.Schema avroSchema) { - for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - String fieldName = fieldSpec.getName(); - Field avroField = avroSchema.getField(fieldName); - if (avroField == null) { - LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName); - } else { - boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField(); - boolean isAvroFieldSingleValue = AvroUtils.isSingleValueField(avroField); - if (isPinotFieldSingleValue != isAvroFieldSingleValue) { - String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi") - + "-valued in Pinot schema but not in Avro schema"; - LOGGER.error(errorMessage); - throw new IllegalStateException(errorMessage); - } - - FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType(); - FieldSpec.DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField); - if (pinotFieldDataType != avroFieldDataType) { - LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}", - fieldName, pinotFieldDataType, avroFieldDataType); - } - } - } - } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/ParquetRecordReaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/ParquetRecordReaderTest.java deleted file mode 100644 index 65ef0ede..0000000 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/ParquetRecordReaderTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.data.readers; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.io.FileUtils; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.pinot.common.utils.ParquetUtils; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -public class ParquetRecordReaderTest extends RecordReaderTest { - private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ParquetRecordReaderTest"); - private static final File DATA_FILE = new File(TEMP_DIR, "data.parquet"); - private static final String DATA_FILE_PATH = DATA_FILE.getAbsolutePath(); - - @BeforeClass - public void setUp() - throws Exception { - FileUtils.forceMkdir(TEMP_DIR); - - String strSchema = - "{\n" + " \"name\": \"AvroParquetTest\",\n" + " \"type\": \"record\",\n" + " \"fields\": [\n" - + " {\n" + " \"name\": \"INT_SV\",\n" + " \"type\": [ \"int\", \"null\"],\n" - + " \"default\": 0 \n" + " },\n" + " {\n" + " \"name\": \"INT_MV\",\n" - + " \"type\": [{\n" + " \"type\": \"array\",\n" - + " \"items\": \"int\"\n" + " }, \"null\"]\n" + " }\n" + " ]\n" + "}"; - - Schema schema = new Schema.Parser().parse(strSchema); - List<GenericRecord> records = new ArrayList<>(); - - for (Object[] r : RECORDS) { - GenericRecord record = new GenericData.Record(schema); - if (r[0] != null) { - record.put("INT_SV", r[0]); - } else { - record.put("INT_SV", 0); - } - - if (r[1] != null) { - record.put("INT_MV", r[1]); - } else { - record.put("INT_MV", new int[]{-1}); - } - - records.add(record); - } - - ParquetWriter<GenericRecord> writer = ParquetUtils.getParquetWriter(DATA_FILE_PATH, schema); - try { - for (GenericRecord r : records) { - writer.write(r); - } - } finally { - writer.close(); - } - } - - @Test - public void testParquetRecordReader() - throws Exception { - try (ParquetRecordReader recordReader = new ParquetRecordReader(DATA_FILE, SCHEMA)) { - checkValue(recordReader); - recordReader.rewind(); - checkValue(recordReader); - } - } - - @AfterClass - public void tearDown() - throws Exception { - FileUtils.forceDelete(TEMP_DIR); - } -} diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java index 59606a0..010be24 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java @@ -109,7 +109,7 @@ public class SegmentCreationJob extends BaseSegmentJob { return true; } return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName - .endsWith(".thrift") || fileName.endsWith(".parquet"); + .endsWith(".thrift"); } public void run() diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java index 4c99880..21f3f16 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java @@ -276,9 +276,6 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab if (fileName.endsWith(".thrift")) { return FileFormat.THRIFT; } - if (fileName.endsWith(".parquet")) { - return FileFormat.PARQUET; - } throw new IllegalArgumentException("Unsupported file format: {}" + fileName); } diff --git a/pom.xml b/pom.xml index bdc2e4c..0756f8b 100644 --- a/pom.xml +++ b/pom.xml @@ -116,7 +116,6 @@ <SKIP_INTEGRATION_TESTS>true</SKIP_INTEGRATION_TESTS> <!-- Configuration for unit/integration tests section 1 of 3 (properties) ENDS HERE.--> <avro.version>1.7.6</avro.version> - <parquet.version>1.8.0</parquet.version> <helix.version>0.8.2</helix.version> <!-- jfim: for Kafka 0.9.0.0, use zkclient 0.7 --> <kafka.version>0.9.0.1</kafka.version> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
