This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 69db491  add support for parquet reader (#3852)
69db491 is described below

commit 69db49139e0acec1c76398c71420cb5832622703
Author: WangGuangxin <[email protected]>
AuthorDate: Wed Apr 3 12:21:43 2019 +0800

    add support for parquet reader (#3852)
    
    * add parquet reader
    
    * refactor code
    
    * move code to test package
    
    * refactor
---
 pinot-common/pom.xml                               |   5 +
 .../apache/pinot/common/utils/ParquetUtils.java    |  99 ++++++++++++++++++
 pinot-core/pom.xml                                 |   5 +
 .../pinot/core/data/readers/AvroRecordReader.java  |  31 +-----
 .../apache/pinot/core/data/readers/FileFormat.java |   2 +-
 .../core/data/readers/ParquetRecordReader.java     | 116 +++++++++++++++++++++
 .../core/data/readers/RecordReaderFactory.java     |   2 +
 .../java/org/apache/pinot/core/util/AvroUtils.java |  36 +++++++
 .../core/data/readers/ParquetRecordReaderTest.java |  98 +++++++++++++++++
 .../pinot/hadoop/job/SegmentCreationJob.java       |   2 +-
 .../hadoop/job/mapper/SegmentCreationMapper.java   |   3 +
 pom.xml                                            |   1 +
 12 files changed, 368 insertions(+), 32 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 55485a7..2a5d180 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -313,6 +313,11 @@
       <groupId>org.glassfish.jersey.core</groupId>
       <artifactId>jersey-server</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ParquetUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ParquetUtils.java
new file mode 100644
index 0000000..234346d
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ParquetUtils.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import java.io.IOException;
+
+
+public class ParquetUtils {
+  public static final String DEFAULT_FS = "file:///";
+
+  /**
+   * Get a ParquetReader with the given file.
+   * @param fileName the parquet file to read
+   * @return a ParquetReader
+   * @throws IOException
+   */
+  public static ParquetReader<GenericRecord> getParquetReader(String fileName)
+      throws IOException {
+    Path dataFsPath = new Path(fileName);
+    return 
AvroParquetReader.<GenericRecord>builder(dataFsPath).disableCompatibility().withDataModel(GenericData.get())
+        .withConf(getConfiguration()).build();
+  }
+
+  /**
+   * Read the parquet file schema
+   * @param fileName
+   * @return the parquet file schema
+   * @throws IOException
+   */
+  public static Schema getParquetSchema(String fileName)
+      throws IOException {
+    Path dataFsPath = new Path(fileName);
+    ParquetMetadata footer = ParquetFileReader.readFooter(getConfiguration(), 
dataFsPath);
+
+    String schemaString = 
footer.getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema");
+    if (schemaString == null) {
+      // try the older property
+      schemaString = 
footer.getFileMetaData().getKeyValueMetaData().get("avro.schema");
+    }
+
+    if (schemaString != null) {
+      return new Schema.Parser().parse(schemaString);
+    } else {
+      return new 
AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+    }
+  }
+
+  /**
+   * Get a ParquetWriter with the given file
+   * @param fileName
+   * @param schema
+   * @return a ParquetWriter
+   * @throws IOException
+   */
+  public static ParquetWriter<GenericRecord> getParquetWriter(String fileName, 
Schema schema)
+      throws IOException {
+    Path dataFsPath = new Path(fileName);
+    return 
AvroParquetWriter.<GenericRecord>builder(dataFsPath).withSchema(schema).withConf(getConfiguration()).build();
+  }
+
+  private static Configuration getConfiguration() {
+    // The file path used in ParquetRecordReader is a local file path without 
prefix 'file:///',
+    // so we have to make sure that the configuration item 'fs.defaultFS' is 
set to 'file:///'
+    // in case that user's hadoop conf overwrite this item
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", DEFAULT_FS);
+    conf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
+    return conf;
+  }
+}
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index a82b8ee..416a91f 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -208,6 +208,11 @@
       <artifactId>equalsverifier</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.commons</groupId>
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
index c5ca4cc..4102108 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
@@ -21,11 +21,9 @@ package org.apache.pinot.core.data.readers;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import org.apache.avro.Schema.Field;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.common.data.FieldSpec;
-import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -54,7 +52,7 @@ public class AvroRecordReader implements RecordReader {
     _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
     _avroReader = AvroUtils.getAvroReader(dataFile);
     try {
-      validateSchema();
+      AvroUtils.validateSchema(_schema, _avroReader.getSchema());
     } catch (Exception e) {
       _avroReader.close();
       throw e;
@@ -66,33 +64,6 @@ public class AvroRecordReader implements RecordReader {
 
   }
 
-  private void validateSchema() {
-    org.apache.avro.Schema avroSchema = _avroReader.getSchema();
-    for (FieldSpec fieldSpec : _fieldSpecs) {
-      String fieldName = fieldSpec.getName();
-      Field avroField = avroSchema.getField(fieldName);
-      if (avroField == null) {
-        LOGGER.warn("Pinot field: {} does not exist in Avro Schema", 
fieldName);
-      } else {
-        boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
-        boolean isAvroFieldSingleValue = 
AvroUtils.isSingleValueField(avroField);
-        if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
-          String errorMessage = "Pinot field: " + fieldName + " is " + 
(isPinotFieldSingleValue ? "Single" : "Multi")
-              + "-valued in Pinot schema but not in Avro schema";
-          LOGGER.error(errorMessage);
-          throw new IllegalStateException(errorMessage);
-        }
-
-        DataType pinotFieldDataType = fieldSpec.getDataType();
-        DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField);
-        if (pinotFieldDataType != avroFieldDataType) {
-          LOGGER.warn("Pinot field: {} of type: {} mismatches with 
corresponding field in Avro Schema of type: {}",
-              fieldName, pinotFieldDataType, avroFieldDataType);
-        }
-      }
-    }
-  }
-
   @Override
   public boolean hasNext() {
     return _avroReader.hasNext();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
index 5f7120d..43f8391 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.core.data.readers;
 
 public enum FileFormat {
-  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
+  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ParquetRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ParquetRecordReader.java
new file mode 100644
index 0000000..6067540
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ParquetRecordReader.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.ParquetUtils;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.util.AvroUtils;
+
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Record reader for Parquet file.
+ */
+public class ParquetRecordReader implements RecordReader {
+  private final String _dataFilePath;
+  private final Schema _schema;
+  private final List<FieldSpec> _fieldSpecs;
+
+  private ParquetReader<GenericRecord> _reader;
+  private GenericRecord _next;
+  private boolean _hasNext;
+
+  public ParquetRecordReader(File dataFile, Schema schema)
+      throws IOException {
+    _dataFilePath = dataFile.getAbsolutePath();
+    _schema = schema;
+    _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
+
+    _reader = ParquetUtils.getParquetReader(_dataFilePath);
+    advanceToNext();
+
+    AvroUtils.validateSchema(_schema, 
ParquetUtils.getParquetSchema(_dataFilePath));
+  }
+
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _hasNext;
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    for (FieldSpec fieldSpec : _fieldSpecs) {
+      String fieldName = fieldSpec.getName();
+      Object value = _next.get(fieldName);
+      // Allow default value for non-time columns
+      if (value != null || fieldSpec.getFieldType() != 
FieldSpec.FieldType.TIME) {
+        reuse.putField(fieldName, RecordReaderUtils.convert(fieldSpec, value));
+      }
+    }
+    advanceToNext();
+    return reuse;
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _reader = ParquetUtils.getParquetReader(_dataFilePath);
+    advanceToNext();
+  }
+
+  @Override
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _reader.close();
+  }
+
+  private void advanceToNext() {
+    try {
+      _next = _reader.read();
+      _hasNext = (_next != null);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed while reading parquet file: " + 
_dataFilePath, e);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index 7909905..740004f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -70,6 +70,8 @@ public class RecordReaderFactory {
       // NOTE: PinotSegmentRecordReader does not support time conversion 
(field spec must match)
       case PINOT:
         return new PinotSegmentRecordReader(dataFile, schema, 
segmentGeneratorConfig.getColumnSortOrder());
+      case PARQUET:
+        return new ParquetRecordReader(dataFile, schema);
       default:
         throw new UnsupportedOperationException("Unsupported input file 
format: " + fileFormat);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
index a6b4dd5..b11c252 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import javax.annotation.Nullable;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.common.data.DimensionFieldSpec;
@@ -38,9 +40,14 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.MetricFieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.core.data.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class AvroUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AvroUtils.class);
+
   private AvroUtils() {
   }
 
@@ -268,4 +275,33 @@ public class AvroUtils {
       return fieldSchema;
     }
   }
+
+  /**
+   * Valid table schema with avro schema
+   */
+  public static void validateSchema(Schema schema, org.apache.avro.Schema 
avroSchema) {
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      String fieldName = fieldSpec.getName();
+      Field avroField = avroSchema.getField(fieldName);
+      if (avroField == null) {
+        LOGGER.warn("Pinot field: {} does not exist in Avro Schema", 
fieldName);
+      } else {
+        boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
+        boolean isAvroFieldSingleValue = 
AvroUtils.isSingleValueField(avroField);
+        if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
+          String errorMessage = "Pinot field: " + fieldName + " is " + 
(isPinotFieldSingleValue ? "Single" : "Multi")
+                  + "-valued in Pinot schema but not in Avro schema";
+          LOGGER.error(errorMessage);
+          throw new IllegalStateException(errorMessage);
+        }
+
+        FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType();
+        FieldSpec.DataType avroFieldDataType = 
AvroUtils.extractFieldDataType(avroField);
+        if (pinotFieldDataType != avroFieldDataType) {
+          LOGGER.warn("Pinot field: {} of type: {} mismatches with 
corresponding field in Avro Schema of type: {}",
+                  fieldName, pinotFieldDataType, avroFieldDataType);
+        }
+      }
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/ParquetRecordReaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/ParquetRecordReaderTest.java
new file mode 100644
index 0000000..65ef0ede
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/ParquetRecordReaderTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.pinot.common.utils.ParquetUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ParquetRecordReaderTest extends RecordReaderTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"ParquetRecordReaderTest");
+  private static final File DATA_FILE = new File(TEMP_DIR, "data.parquet");
+  private static final String DATA_FILE_PATH = DATA_FILE.getAbsolutePath();
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.forceMkdir(TEMP_DIR);
+
+    String strSchema =
+        "{\n" + "    \"name\": \"AvroParquetTest\",\n" + "    \"type\": 
\"record\",\n" + "    \"fields\": [\n"
+            + "        {\n" + "            \"name\": \"INT_SV\",\n" + "        
    \"type\": [ \"int\", \"null\"],\n"
+            + "            \"default\": 0 \n" + "        },\n" + "        {\n" 
+ "            \"name\": \"INT_MV\",\n"
+            + "            \"type\": [{\n" + "                \"type\": 
\"array\",\n"
+            + "                \"items\": \"int\"\n" + "             }, 
\"null\"]\n" + "        }\n" + "    ]\n" + "}";
+
+    Schema schema = new Schema.Parser().parse(strSchema);
+    List<GenericRecord> records = new ArrayList<>();
+
+    for (Object[] r : RECORDS) {
+      GenericRecord record = new GenericData.Record(schema);
+      if (r[0] != null) {
+        record.put("INT_SV", r[0]);
+      } else {
+        record.put("INT_SV", 0);
+      }
+
+      if (r[1] != null) {
+        record.put("INT_MV", r[1]);
+      } else {
+        record.put("INT_MV", new int[]{-1});
+      }
+
+      records.add(record);
+    }
+
+    ParquetWriter<GenericRecord> writer = 
ParquetUtils.getParquetWriter(DATA_FILE_PATH, schema);
+    try {
+      for (GenericRecord r : records) {
+        writer.write(r);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  public void testParquetRecordReader()
+      throws Exception {
+    try (ParquetRecordReader recordReader = new ParquetRecordReader(DATA_FILE, 
SCHEMA)) {
+      checkValue(recordReader);
+      recordReader.rewind();
+      checkValue(recordReader);
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    FileUtils.forceDelete(TEMP_DIR);
+  }
+}
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index 010be24..59606a0 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -109,7 +109,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
       return true;
     }
     return fileName.endsWith(".avro") || fileName.endsWith(".csv") || 
fileName.endsWith(".json") || fileName
-        .endsWith(".thrift");
+        .endsWith(".thrift") || fileName.endsWith(".parquet");
   }
 
   public void run()
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index 21f3f16..4c99880 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -276,6 +276,9 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     if (fileName.endsWith(".thrift")) {
       return FileFormat.THRIFT;
     }
+    if (fileName.endsWith(".parquet")) {
+      return FileFormat.PARQUET;
+    }
     throw new IllegalArgumentException("Unsupported file format: {}" + 
fileName);
   }
 
diff --git a/pom.xml b/pom.xml
index 0756f8b..bdc2e4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@
     <SKIP_INTEGRATION_TESTS>true</SKIP_INTEGRATION_TESTS>
     <!-- Configuration for unit/integration tests section 1 of 3 (properties) 
ENDS HERE.-->
     <avro.version>1.7.6</avro.version>
+    <parquet.version>1.8.0</parquet.version>
     <helix.version>0.8.2</helix.version>
     <!-- jfim: for Kafka 0.9.0.0, use zkclient 0.7 -->
     <kafka.version>0.9.0.1</kafka.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to