This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d1bad1d29a Infer parquet reader type based on file metadata (#9294)
d1bad1d29a is described below
commit d1bad1d29a435386dc945e65d5e875ed7cb2e2ba
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Sep 1 23:35:09 2022 +0530
Infer parquet reader type based on file metadata (#9294)
---
.../inputformat/parquet/ParquetRecordReader.java | 14 +++++++++++--
.../parquet/ParquetRecordReaderConfig.java | 16 ++++++++++++--
.../plugin/inputformat/parquet/ParquetUtils.java | 23 +++++++++++++++------
.../parquet/ParquetRecordReaderTest.java | 23 +++++++++++++++++++--
.../src/test/resources/data-avro.parquet | Bin 0 -> 18571 bytes
5 files changed, 64 insertions(+), 12 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 790e97fd2e..6be22851fc 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.hadoop.fs.Path;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -38,11 +39,20 @@ public class ParquetRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
- if (recordReaderConfig == null || ((ParquetRecordReaderConfig)
recordReaderConfig).useParquetAvroRecordReader()) {
+ if (recordReaderConfig != null && ((ParquetRecordReaderConfig)
recordReaderConfig).useParquetAvroRecordReader()) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
- } else {
+ } else if (recordReaderConfig != null
+ && ((ParquetRecordReaderConfig)
recordReaderConfig).useParquetNativeRecordReader()) {
_useAvroParquetRecordReader = false;
_internalParquetRecordReader = new ParquetNativeRecordReader();
+ } else {
+ // No reader type specified. Determine using file metadata
+ if (ParquetUtils.hasAvroSchemaInFileMetadata(new
Path(dataFile.getAbsolutePath()))) {
+ _internalParquetRecordReader = new ParquetAvroRecordReader();
+ } else {
+ _useAvroParquetRecordReader = false;
+ _internalParquetRecordReader = new ParquetNativeRecordReader();
+ }
}
_internalParquetRecordReader.init(dataFile, fieldsToRead,
recordReaderConfig);
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
index d6bdce2609..e6da878589 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java
@@ -27,7 +27,10 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig;
*/
public class ParquetRecordReaderConfig implements RecordReaderConfig {
private static final String USE_PARQUET_AVRO_RECORDER_READER =
"useParquetAvroRecordReader";
- private boolean _useParquetAvroRecordReader = true;
+ private static final String USE_PARQUET_NATIVE_RECORDER_READER =
"useParquetNativeRecordReader";
+
+ private boolean _useParquetAvroRecordReader;
+ private boolean _useParquetNativeRecordReader;
private Configuration _conf;
public ParquetRecordReaderConfig() {
@@ -35,13 +38,22 @@ public class ParquetRecordReaderConfig implements
RecordReaderConfig {
public ParquetRecordReaderConfig(Configuration conf) {
_conf = conf;
- _useParquetAvroRecordReader =
conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true);
+ _useParquetAvroRecordReader =
conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, false);
+ _useParquetNativeRecordReader =
conf.getBoolean(USE_PARQUET_NATIVE_RECORDER_READER, false);
}
public boolean useParquetAvroRecordReader() {
return _useParquetAvroRecordReader;
}
+ public boolean useParquetNativeRecordReader() {
+ return _useParquetNativeRecordReader;
+ }
+
+ public void setUseParquetNativeRecordReader(boolean
useParquetNativeRecordReader) {
+ _useParquetNativeRecordReader = useParquetNativeRecordReader;
+ }
+
public void setUseParquetAvroRecordReader(boolean
useParquetAvroRecordReader) {
_useParquetAvroRecordReader = useParquetAvroRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index c3a6bd7a2d..a55e3fa702 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -38,6 +38,8 @@ import org.apache.parquet.schema.MessageType;
public class ParquetUtils {
private static final String DEFAULT_FS = "file:///";
+ private static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
+ private static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema";
private ParquetUtils() {
}
@@ -69,12 +71,13 @@ public class ParquetUtils {
ParquetMetadata footer =
ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path,
ParquetMetadataConverter.NO_FILTER);
Map<String, String> metaData =
footer.getFileMetaData().getKeyValueMetaData();
- String schemaString = metaData.get("parquet.avro.schema");
- if (schemaString == null) {
- // Try the older property
- schemaString = metaData.get("avro.schema");
- }
- if (schemaString != null) {
+
+ if (hasAvroSchemaInFileMetadata(path)) {
+ String schemaString = metaData.get(AVRO_SCHEMA_METADATA_KEY);
+ if (schemaString == null) {
+ // Try the older property
+ schemaString = metaData.get(OLD_AVRO_SCHEMA_METADATA_KEY);
+ }
return new Schema.Parser().parse(schemaString);
} else {
MessageType parquetSchema = footer.getFileMetaData().getSchema();
@@ -82,6 +85,14 @@ public class ParquetUtils {
}
}
+ public static boolean hasAvroSchemaInFileMetadata(Path path) throws
IOException {
+ ParquetMetadata footer =
+ ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path,
ParquetMetadataConverter.NO_FILTER);
+ Map<String, String> metaData =
footer.getFileMetaData().getKeyValueMetaData();
+
+ return metaData.containsKey(AVRO_SCHEMA_METADATA_KEY) ||
metaData.containsKey(OLD_AVRO_SCHEMA_METADATA_KEY);
+ }
+
public static Configuration getParquetHadoopConfiguration() {
// The file path used in ParquetRecordReader is a local file path without
prefix 'file:///',
// so we have to make sure that the configuration item 'fs.defaultFS' is
set to 'file:///'
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index f39df45495..14dda0e5d1 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -99,6 +99,23 @@ public class ParquetRecordReaderTest extends
AbstractRecordReaderTest {
testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
}
+ @Test
+ public void testFileMetadataParsing()
+ throws IOException {
+ final ParquetRecordReader parquetRecordReader = new ParquetRecordReader();
+ File avroParquetFile = new
File(getClass().getClassLoader().getResource("data-avro.parquet").getFile());
+ parquetRecordReader.init(avroParquetFile, null, null);
+ // Should be avro since file metadata has avro schema
+ Assert.assertTrue(parquetRecordReader.useAvroParquetRecordReader());
+
+
+ final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
+ File nativeParquetFile = new
File(getClass().getClassLoader().getResource("users.parquet").getFile());
+ parquetRecordReader.init(nativeParquetFile, null, null);
+ // Should be native since file metadata does not have avro schema
+ Assert.assertFalse(parquetRecordReader.useAvroParquetRecordReader());
+ }
+
@Test
public void testComparison()
throws IOException {
@@ -116,10 +133,12 @@ public class ParquetRecordReaderTest extends
AbstractRecordReaderTest {
private void testComparison(File dataFile, int totalRecords)
throws IOException {
final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
- avroRecordReader.init(dataFile, null, null);
+ ParquetRecordReaderConfig avroRecordReaderConfig = new
ParquetRecordReaderConfig();
+ avroRecordReaderConfig.setUseParquetAvroRecordReader(true);
+ avroRecordReader.init(dataFile, null, avroRecordReaderConfig);
final ParquetRecordReader nativeRecordReader = new ParquetRecordReader();
ParquetRecordReaderConfig parquetRecordReaderConfig = new
ParquetRecordReaderConfig();
- parquetRecordReaderConfig.setUseParquetAvroRecordReader(false);
+ parquetRecordReaderConfig.setUseParquetNativeRecordReader(true);
nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig);
Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/data-avro.parquet
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/data-avro.parquet
new file mode 100644
index 0000000000..b95c429113
Binary files /dev/null and
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/data-avro.parquet
differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]