This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d1bad1d29a Infer parquet reader type based on file metadata (#9294)
d1bad1d29a is described below

commit d1bad1d29a435386dc945e65d5e875ed7cb2e2ba
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Sep 1 23:35:09 2022 +0530

    Infer parquet reader type based on file metadata (#9294)
---
 .../inputformat/parquet/ParquetRecordReader.java   |  14 +++++++++++--
 .../parquet/ParquetRecordReaderConfig.java         |  16 ++++++++++++--
 .../plugin/inputformat/parquet/ParquetUtils.java   |  23 +++++++++++++++------
 .../parquet/ParquetRecordReaderTest.java           |  23 +++++++++++++++++++--
 .../src/test/resources/data-avro.parquet           | Bin 0 -> 18571 bytes
 5 files changed, 64 insertions(+), 12 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 790e97fd2e..6be22851fc 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.hadoop.fs.Path;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -38,11 +39,20 @@ public class ParquetRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    if (recordReaderConfig == null || ((ParquetRecordReaderConfig) 
recordReaderConfig).useParquetAvroRecordReader()) {
+    if (recordReaderConfig != null && ((ParquetRecordReaderConfig) 
recordReaderConfig).useParquetAvroRecordReader()) {
       _internalParquetRecordReader = new ParquetAvroRecordReader();
-    } else {
+    } else if (recordReaderConfig != null
+        && ((ParquetRecordReaderConfig) 
recordReaderConfig).useParquetNativeRecordReader()) {
       _useAvroParquetRecordReader = false;
       _internalParquetRecordReader = new ParquetNativeRecordReader();
+    } else {
+      // No reader type specified. Determine using file metadata
+      if (ParquetUtils.hasAvroSchemaInFileMetadata(new 
Path(dataFile.getAbsolutePath()))) {
+        _internalParquetRecordReader = new ParquetAvroRecordReader();
+      } else {
+        _useAvroParquetRecordReader = false;
+        _internalParquetRecordReader = new ParquetNativeRecordReader();
+      }
     }
     _internalParquetRecordReader.init(dataFile, fieldsToRead, 
recordReaderConfig);
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
index d6bdce2609..e6da878589 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
@@ -27,7 +27,10 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig;
  */
 public class ParquetRecordReaderConfig implements RecordReaderConfig {
   private static final String USE_PARQUET_AVRO_RECORDER_READER = 
"useParquetAvroRecordReader";
-  private boolean _useParquetAvroRecordReader = true;
+  private static final String USE_PARQUET_NATIVE_RECORDER_READER = 
"useParquetNativeRecordReader";
+
+  private boolean _useParquetAvroRecordReader;
+  private boolean _useParquetNativeRecordReader;
   private Configuration _conf;
 
   public ParquetRecordReaderConfig() {
@@ -35,13 +38,22 @@ public class ParquetRecordReaderConfig implements 
RecordReaderConfig {
 
   public ParquetRecordReaderConfig(Configuration conf) {
     _conf = conf;
-    _useParquetAvroRecordReader = 
conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true);
+    _useParquetAvroRecordReader = 
conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, false);
+    _useParquetNativeRecordReader = 
conf.getBoolean(USE_PARQUET_NATIVE_RECORDER_READER, false);
   }
 
   public boolean useParquetAvroRecordReader() {
     return _useParquetAvroRecordReader;
   }
 
+  public boolean useParquetNativeRecordReader() {
+    return _useParquetNativeRecordReader;
+  }
+
+  public void setUseParquetNativeRecordReader(boolean 
useParquetNativeRecordReader) {
+    _useParquetNativeRecordReader = useParquetNativeRecordReader;
+  }
+
   public void setUseParquetAvroRecordReader(boolean 
useParquetAvroRecordReader) {
     _useParquetAvroRecordReader = useParquetAvroRecordReader;
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index c3a6bd7a2d..a55e3fa702 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -38,6 +38,8 @@ import org.apache.parquet.schema.MessageType;
 
 public class ParquetUtils {
   private static final String DEFAULT_FS = "file:///";
+  private static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
+  private static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema";
 
   private ParquetUtils() {
   }
@@ -69,12 +71,13 @@ public class ParquetUtils {
     ParquetMetadata footer =
         ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, 
ParquetMetadataConverter.NO_FILTER);
     Map<String, String> metaData = 
footer.getFileMetaData().getKeyValueMetaData();
-    String schemaString = metaData.get("parquet.avro.schema");
-    if (schemaString == null) {
-      // Try the older property
-      schemaString = metaData.get("avro.schema");
-    }
-    if (schemaString != null) {
+
+    if (hasAvroSchemaInFileMetadata(path)) {
+      String schemaString = metaData.get(AVRO_SCHEMA_METADATA_KEY);
+      if (schemaString == null) {
+        // Try the older property
+        schemaString = metaData.get(OLD_AVRO_SCHEMA_METADATA_KEY);
+      }
       return new Schema.Parser().parse(schemaString);
     } else {
       MessageType parquetSchema = footer.getFileMetaData().getSchema();
@@ -82,6 +85,14 @@ public class ParquetUtils {
     }
   }
 
+  public static boolean hasAvroSchemaInFileMetadata(Path path) throws 
IOException {
+    ParquetMetadata footer =
+        ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, 
ParquetMetadataConverter.NO_FILTER);
+    Map<String, String> metaData = 
footer.getFileMetaData().getKeyValueMetaData();
+
+    return metaData.containsKey(AVRO_SCHEMA_METADATA_KEY) || 
metaData.containsKey(OLD_AVRO_SCHEMA_METADATA_KEY);
+  }
+
   public static Configuration getParquetHadoopConfiguration() {
     // The file path used in ParquetRecordReader is a local file path without 
prefix 'file:///',
     // so we have to make sure that the configuration item 'fs.defaultFS' is 
set to 'file:///'
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index f39df45495..14dda0e5d1 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -99,6 +99,23 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
     testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
   }
 
+  @Test
+  public void testFileMetadataParsing()
+      throws IOException {
+    final ParquetRecordReader parquetRecordReader = new ParquetRecordReader();
+    File avroParquetFile = new 
File(getClass().getClassLoader().getResource("data-avro.parquet").getFile());
+    parquetRecordReader.init(avroParquetFile, null, null);
+    // Should be avro since file metadata has avro schema
+    Assert.assertTrue(parquetRecordReader.useAvroParquetRecordReader());
+
+
+    final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
+    File nativeParquetFile = new 
File(getClass().getClassLoader().getResource("users.parquet").getFile());
+    parquetRecordReader.init(nativeParquetFile, null, null);
+    // Should be native since file metadata does not have avro schema
+    Assert.assertFalse(parquetRecordReader.useAvroParquetRecordReader());
+  }
+
   @Test
   public void testComparison()
       throws IOException {
@@ -116,10 +133,12 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
   private void testComparison(File dataFile, int totalRecords)
       throws IOException {
     final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
-    avroRecordReader.init(dataFile, null, null);
+    ParquetRecordReaderConfig avroRecordReaderConfig = new 
ParquetRecordReaderConfig();
+    avroRecordReaderConfig.setUseParquetAvroRecordReader(true);
+    avroRecordReader.init(dataFile, null, avroRecordReaderConfig);
     final ParquetRecordReader nativeRecordReader = new ParquetRecordReader();
     ParquetRecordReaderConfig parquetRecordReaderConfig = new 
ParquetRecordReaderConfig();
-    parquetRecordReaderConfig.setUseParquetAvroRecordReader(false);
+    parquetRecordReaderConfig.setUseParquetNativeRecordReader(true);
     nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig);
     Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
     Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/data-avro.parquet
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/data-avro.parquet
new file mode 100644
index 0000000000..b95c429113
Binary files /dev/null and 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/data-avro.parquet
 differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to