This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b08469cf PARQUET-2347: Add interface layer between Parquet and Hadoop 
Configuration (#1141)
1b08469cf is described below

commit 1b08469cf9287bac218210abd76ed696682c257d
Author: Atour <[email protected]>
AuthorDate: Mon Oct 30 02:53:07 2023 +0100

    PARQUET-2347: Add interface layer between Parquet and Hadoop Configuration 
(#1141)
---
 .../parquet/avro/AvroParquetOutputFormat.java      |   2 -
 .../org/apache/parquet/avro/AvroParquetReader.java |  24 +++
 .../org/apache/parquet/avro/AvroParquetWriter.java |  13 ++
 .../org/apache/parquet/avro/AvroReadSupport.java   |  21 ++-
 .../apache/parquet/avro/AvroSchemaConverter.java   |   6 +
 .../org/apache/parquet/avro/AvroWriteSupport.java  |  12 +-
 .../org/apache/parquet/avro/TestReadWrite.java     |  62 +++++--
 .../parquet/avro/TestReadWriteOldListBehavior.java |   7 +
 .../apache/parquet/conf/ParquetConfiguration.java  | 178 +++++++++++++++++++++
 .../java/org/apache/parquet/util/Reflection.java   |  50 ++++++
 .../java/org/apache/parquet/HadoopReadOptions.java |  33 +---
 .../org/apache/parquet/ParquetReadOptions.java     |  80 ++++++++-
 .../parquet/conf/HadoopParquetConfiguration.java   | 140 ++++++++++++++++
 .../org/apache/parquet/hadoop/CodecFactory.java    |  22 ++-
 .../java/org/apache/parquet/hadoop/DirectZstd.java |  12 +-
 .../hadoop/InternalParquetRecordReader.java        |  10 +-
 .../apache/parquet/hadoop/ParquetInputFormat.java  |  25 +--
 .../org/apache/parquet/hadoop/ParquetReader.java   |  58 +++++--
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  61 ++++++-
 .../parquet/hadoop/api/DelegatingReadSupport.java  |  10 ++
 .../parquet/hadoop/api/DelegatingWriteSupport.java |   6 +
 .../org/apache/parquet/hadoop/api/InitContext.java |  19 ++-
 .../org/apache/parquet/hadoop/api/ReadSupport.java |  47 +++++-
 .../apache/parquet/hadoop/api/WriteSupport.java    |  10 ++
 .../hadoop/example/ExampleParquetWriter.java       |   6 +
 .../parquet/hadoop/example/GroupReadSupport.java   |  16 ++
 .../parquet/hadoop/example/GroupWriteSupport.java  |  11 ++
 .../parquet/hadoop/util/ConfigurationUtil.java     |  26 ++-
 .../apache/parquet/hadoop/util/HadoopCodecs.java   |   5 +
 .../parquet/hadoop/util/SerializationUtil.java     |  18 ++-
 .../java/org/apache/parquet/DirectWriterTest.java  |   6 +
 .../SchemaControlEncryptionTest.java               |  16 ++
 .../parquet/hadoop/TestEncryptionOptions.java      |   2 +-
 .../org/apache/parquet/pig/TupleReadSupport.java   |  36 ++++-
 .../org/apache/parquet/pig/TupleWriteSupport.java  |   7 +
 .../parquet/pig/TestTupleRecordConsumer.java       |   3 +-
 .../apache/parquet/pig/TupleConsumerPerfTest.java  |   7 +-
 .../parquet/proto/ProtoMessageConverter.java       |  17 +-
 .../apache/parquet/proto/ProtoParquetWriter.java   |  14 +-
 .../org/apache/parquet/proto/ProtoReadSupport.java |   9 +-
 .../apache/parquet/proto/ProtoRecordConverter.java |   6 +
 .../parquet/proto/ProtoRecordMaterializer.java     |   6 +
 .../apache/parquet/proto/ProtoSchemaConverter.java |  14 +-
 .../apache/parquet/proto/ProtoWriteSupport.java    |   7 +
 .../hadoop/thrift/AbstractThriftWriteSupport.java  |  21 ++-
 .../parquet/hadoop/thrift/TBaseWriteSupport.java   |  12 +-
 .../hadoop/thrift/ThriftBytesWriteSupport.java     |  22 ++-
 .../parquet/hadoop/thrift/ThriftReadSupport.java   |  68 +++++++-
 .../parquet/hadoop/thrift/ThriftWriteSupport.java  |   6 +
 .../parquet/thrift/ParquetWriteProtocol.java       |   7 +
 .../parquet/thrift/TBaseRecordConverter.java       |   9 +-
 .../parquet/thrift/ThriftRecordConverter.java      |  15 +-
 .../parquet/thrift/ThriftSchemaConvertVisitor.java |  12 ++
 .../parquet/thrift/ThriftSchemaConverter.java      |  13 +-
 .../thrift/pig/TupleToThriftWriteSupport.java      |   9 +-
 .../parquet/thrift/TestParquetWriteProtocol.java   |   3 +-
 pom.xml                                            |   2 +
 57 files changed, 1206 insertions(+), 133 deletions(-)

diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
index afbaefcb0..9195925f7 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
@@ -19,9 +19,7 @@
 package org.apache.parquet.avro;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.apache.parquet.hadoop.util.ContextUtil;
 
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
index 8970b66ea..3c98948b6 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
@@ -26,6 +26,7 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.api.ReadSupport;
@@ -53,6 +54,10 @@ public class AvroParquetReader<T> extends ParquetReader<T> {
     return new Builder<T>(file);
   }
 
+  public static <T> Builder<T> builder(InputFile file, ParquetConfiguration 
conf) {
+    return new Builder<T>(file, conf);
+  }
+
   /**
    * Convenience method for creating a ParquetReader which uses Avro
    * {@link GenericData} objects to store data from reads.
@@ -67,6 +72,21 @@ public class AvroParquetReader<T> extends ParquetReader<T> {
     return new 
Builder<GenericRecord>(file).withDataModel(GenericData.get()).build();
   }
 
+  /**
+   * Convenience method for creating a ParquetReader which uses Avro
+   * {@link GenericData} objects to store data from reads.
+   *
+   * @param file The location to read data from
+   * @param conf The configuration to use
+   * @return A {@code ParquetReader} which reads data as Avro
+   *         {@code GenericData}
+   * @throws IOException if the InputFile has been closed, or if some other I/O
+   *           error occurs
+   */
+  public static ParquetReader<GenericRecord> genericRecordReader(InputFile 
file, ParquetConfiguration conf) throws IOException {
+    return new Builder<GenericRecord>(file, 
conf).withDataModel(GenericData.get()).build();
+  }
+
   /**
    * Convenience method for creating a ParquetReader which uses Avro
    * {@link GenericData} objects to store data from reads.
@@ -143,6 +163,10 @@ public class AvroParquetReader<T> extends ParquetReader<T> 
{
       super(file);
     }
 
+    private Builder(InputFile file, ParquetConfiguration conf) {
+      super(file, conf);
+    }
+
     public Builder<T> withDataModel(GenericData model) {
       this.model = model;
 
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index 94d8167b0..0d87d007f 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -25,6 +25,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -154,6 +156,12 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> 
{
   private static <T> WriteSupport<T> writeSupport(Configuration conf,
                                                   Schema avroSchema,
                                                   GenericData model) {
+    return writeSupport(new HadoopParquetConfiguration(conf), avroSchema, 
model);
+  }
+
+  private static <T> WriteSupport<T> writeSupport(ParquetConfiguration conf,
+                                                  Schema avroSchema,
+                                                  GenericData model) {
     return new AvroWriteSupport<T>(
         new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model);
   }
@@ -189,5 +197,10 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> 
{
     protected WriteSupport<T> getWriteSupport(Configuration conf) {
       return AvroParquetWriter.writeSupport(conf, schema, model);
     }
+
+    @Override
+    protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+      return AvroParquetWriter.writeSupport(conf, schema, model);
+    }
   }
 }
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index 8f268a145..0bda3d02e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -24,7 +24,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 import org.slf4j.Logger;
@@ -95,6 +98,13 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
   public ReadContext init(Configuration configuration,
                           Map<String, String> keyValueMetaData,
                           MessageType fileSchema) {
+    return init(new HadoopParquetConfiguration(configuration), 
keyValueMetaData, fileSchema);
+  }
+
+  @Override
+  public ReadContext init(ParquetConfiguration configuration,
+                          Map<String, String> keyValueMetaData,
+                          MessageType fileSchema) {
     MessageType projection = fileSchema;
     Map<String, String> metadata = new LinkedHashMap<String, String>();
 
@@ -120,6 +130,13 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
   public RecordMaterializer<T> prepareForRead(
       Configuration configuration, Map<String, String> keyValueMetaData,
       MessageType fileSchema, ReadContext readContext) {
+    return prepareForRead(new HadoopParquetConfiguration(configuration), 
keyValueMetaData, fileSchema, readContext);
+  }
+
+  @Override
+  public RecordMaterializer<T> prepareForRead(
+    ParquetConfiguration configuration, Map<String, String> keyValueMetaData,
+    MessageType fileSchema, ReadContext readContext) {
     Map<String, String> metadata = readContext.getReadSupportMetadata();
     MessageType parquetSchema = readContext.getRequestedSchema();
     Schema avroSchema;
@@ -153,7 +170,7 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
         parquetSchema, avroSchema, model);
   }
 
-  private GenericData getDataModel(Configuration conf, Schema schema) {
+  private GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
     if (model != null) {
       return model;
     }
@@ -175,6 +192,6 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
 
     Class<? extends AvroDataSupplier> suppClass = conf.getClass(
         AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, 
AvroDataSupplier.class);
-    return ReflectionUtils.newInstance(suppClass, conf).get();
+    return ReflectionUtils.newInstance(suppClass, 
ConfigurationUtil.createHadoopConfiguration(conf)).get();
   }
 }
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 0314bcd71..abf94eaa2 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -23,6 +23,8 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -102,6 +104,10 @@ public class AvroSchemaConverter {
   }
 
   public AvroSchemaConverter(Configuration conf) {
+    this(new HadoopParquetConfiguration(conf));
+  }
+
+  public AvroSchemaConverter(ParquetConfiguration conf) {
     this.assumeRepeatedIsListElement = conf.getBoolean(
         ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
     this.writeOldListStructure = conf.getBoolean(
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 564e74539..692e3fac0 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -34,7 +34,10 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.GroupType;
@@ -129,6 +132,11 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
     if (rootAvroSchema == null) {
       this.rootAvroSchema = new 
Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
       this.rootSchema = new 
AvroSchemaConverter(configuration).convert(rootAvroSchema);
@@ -404,7 +412,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     return Binary.fromCharSequence(value.toString());
   }
 
-  private static GenericData getDataModel(Configuration conf, Schema schema) {
+  private static GenericData getDataModel(ParquetConfiguration conf, Schema 
schema) {
     if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
       GenericData modelForSchema;
       try {
@@ -423,7 +431,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
 
     Class<? extends AvroDataSupplier> suppClass = conf.getClass(
         AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, 
AvroDataSupplier.class);
-    return ReflectionUtils.newInstance(suppClass, conf).get();
+    return ReflectionUtils.newInstance(suppClass, 
ConfigurationUtil.createHadoopConfiguration(conf)).get();
   }
 
   private abstract class ListWriter {
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 81e751aba..9aaa9e3b2 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -52,6 +52,8 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -77,22 +79,31 @@ public class TestReadWrite {
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     Object[][] data = new Object[][] {
-        { false, false },  // use the new converters
-        { true, false },   // use the old converters
-        { false, true } }; // use a local disk location
+        { false, false, false },  // use the new converters with hadoop config
+        { true, false, false },   // use the old converters with hadoop config
+        { false, true, false },   // use a local disk location with hadoop 
config
+        { false, false, true },   // use the new converters with parquet 
config interface
+        { true, false, true },    // use the old converters with parquet 
config interface
+        { false, true, true } };  // use a local disk location with parquet 
config interface
     return Arrays.asList(data);
   }
 
   private final boolean compat;
   private final boolean local;
+  private final boolean confInterface;
   private final Configuration testConf = new Configuration();
+  private final ParquetConfiguration parquetConf = new 
HadoopParquetConfiguration(true);
 
-  public TestReadWrite(boolean compat, boolean local) {
+  public TestReadWrite(boolean compat, boolean local, boolean confInterface) {
     this.compat = compat;
     this.local = local;
+    this.confInterface = confInterface;
     this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
-    testConf.setBoolean("parquet.avro.add-list-element-records", false);
-    testConf.setBoolean("parquet.avro.write-old-list-structure", false);
+    this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
+    this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
+    this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+    this.parquetConf.setBoolean("parquet.avro.add-list-element-records", 
false);
+    this.parquetConf.setBoolean("parquet.avro.write-old-list-structure", 
false);
   }
 
   @Test
@@ -431,6 +442,11 @@ public class TestReadWrite {
 
       @Override
       public WriteContext init(Configuration configuration) {
+        return init(new HadoopParquetConfiguration(configuration));
+      }
+
+      @Override
+      public WriteContext init(ParquetConfiguration configuration) {
         return new 
WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
             new HashMap<String, String>());
       }
@@ -864,30 +880,44 @@ public class TestReadWrite {
   }
 
   private ParquetWriter<GenericRecord> writer(String file, Schema schema) 
throws IOException {
+    AvroParquetWriter.Builder<GenericRecord> writerBuilder;
     if (local) {
-      return AvroParquetWriter
+      writerBuilder = AvroParquetWriter
         .<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
-        .withSchema(schema)
-        .withConf(testConf)
-        .build();
+        .withSchema(schema);
     } else {
-      return AvroParquetWriter
+      writerBuilder = AvroParquetWriter
         .<GenericRecord>builder(new Path(file))
-        .withSchema(schema)
+        .withSchema(schema);
+    }
+    if (confInterface) {
+      return writerBuilder
+        .withConf(parquetConf)
+        .build();
+    } else {
+      return writerBuilder
         .withConf(testConf)
         .build();
     }
   }
 
   private ParquetReader<GenericRecord> reader(String file) throws IOException {
+    AvroParquetReader.Builder<GenericRecord> readerBuilder;
     if (local) {
-      return AvroParquetReader
+      readerBuilder = AvroParquetReader
         .<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
-        .withDataModel(GenericData.get())
-        .withConf(testConf)
+        .withDataModel(GenericData.get());
+    } else {
+      return new AvroParquetReader<>(testConf, new Path(file));
+    }
+    if (confInterface) {
+      return readerBuilder
+        .withConf(parquetConf)
         .build();
     } else {
-      return new AvroParquetReader(testConf, new Path(file));
+      return readerBuilder
+        .withConf(testConf)
+        .build();
     }
   }
 
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index f12417cae..31a221d5b 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -38,6 +38,8 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.Binary;
@@ -358,6 +360,11 @@ public class TestReadWriteOldListBehavior {
 
       @Override
       public WriteContext init(Configuration configuration) {
+        return init(new HadoopParquetConfiguration(configuration));
+      }
+
+      @Override
+      public WriteContext init(ParquetConfiguration configuration) {
         return new 
WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
             new HashMap<String, String>());
       }
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java
 
b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java
new file mode 100644
index 000000000..f8aae9729
--- /dev/null
+++ 
b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java
@@ -0,0 +1,178 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.conf;
+
+import java.util.Map;
+
+/**
+ * Configuration interface with the methods necessary to configure Parquet 
applications.
+ */
+public interface ParquetConfiguration extends 
Iterable<Map.Entry<String,String>> {
+
+  /**
+   * Sets the value of the name property.
+   *
+   * @param name the property to set
+   * @param value the value to set the property to
+   */
+  void set(String name, String value);
+
+  /**
+   * Sets the value of the name property to a long.
+   *
+   * @param name the property to set
+   * @param value the value to set the property to
+   */
+  void setLong(String name, long value);
+
+  /**
+   * Sets the value of the name property to an integer.
+   *
+   * @param name the property to set
+   * @param value the value to set the property to
+   */
+  void setInt(String name, int value);
+
+  /**
+   * Sets the value of the name property to a boolean.
+   *
+   * @param name the property to set
+   * @param value the value to set the property to
+   */
+  void setBoolean(String name, boolean value);
+
+  /**
+   * Sets the value of the name property to an array of comma delimited values.
+   *
+   * @param name the property to set
+   * @param value the values to set the property to
+   */
+  void setStrings(String name, String... value);
+
+  /**
+   * Sets the value of the name property to a class.
+   *
+   * @param name the property to set
+   * @param value the value to set the property to
+   * @param xface the interface implemented by the value
+   */
+  void setClass(String name, Class<?> value, Class<?> xface);
+
+  /**
+   * Gets the value of the name property. Returns null if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @return the value of the property, or null if it does not exist
+   */
+  String get(String name);
+
+  /**
+   * Gets the value of the name property. Returns the default value if no such 
value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property, or the default value if it does not 
exist
+   */
+  String get(String name, String defaultValue);
+
+  /**
+   * Gets the value of the name property as a long. Returns the default value 
if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property as a long, or the default value if it 
does not exist
+   */
+  long getLong(String name, long defaultValue);
+
+  /**
+   * Gets the value of the name property as an integer. Returns the default 
value if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property as an integer, or the default value if 
it does not exist
+   */
+  int getInt(String name, int defaultValue);
+
+  /**
+   * Gets the value of the name property as a boolean. Returns the default 
value if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property as a boolean, or the default value if 
it does not exist
+   */
+  boolean getBoolean(String name, boolean defaultValue);
+
+  /**
+   * Gets the trimmed value of the name property. Returns null if no such 
value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @return the trimmed value of the property, or null if it does not exist
+   */
+  String getTrimmed(String name);
+
+  /**
+   * Gets the trimmed value of the name property as a boolean.
+   * Returns the default value if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the trimmed value of the property, or the default value if it 
does not exist
+   */
+  String getTrimmed(String name, String defaultValue);
+
+  /**
+   * Gets the value of the name property as an array of {@link String}s.
+   * Returns the default value if no such value exists.
+   * Interprets the stored value as a comma delimited array.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property as an array, or the default value if it 
does not exist
+   */
+  String[] getStrings(String name, String[] defaultValue);
+
+  /**
+   * Gets the value of the name property as a class. Returns the default value 
if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property as a class, or the default value if it 
does not exist
+   */
+  Class<?> getClass(String name, Class<?> defaultValue);
+
+  /**
+   * Gets the value of the name property as a class implementing the xface 
interface.
+   * Returns the default value if no such value exists.
+   *
+   * @param name the property to retrieve the value of
+   * @param defaultValue the default return if no value is set for the property
+   * @return the value of the property as a class, or the default value if it 
does not exist
+   */
+  <U> Class<? extends U> getClass(String name, Class<? extends U> 
defaultValue, Class<U> xface);
+
+  /**
+   * Load a class by name.
+   *
+   * @param name the name of the {@link Class} to load
+   * @return the loaded class
+   * @throws ClassNotFoundException when the specified class cannot be found
+   */
+  Class<?> getClassByName(String name) throws ClassNotFoundException;
+}
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java 
b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java
new file mode 100644
index 000000000..695ebd9f4
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.util;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Lifted from Hadoop's org.apache.hadoop.util.ReflectionUtils.
+ */
+public class Reflection {
+
+  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = new 
ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> theClass) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 8f0d8d893..9c9865010 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -23,29 +23,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
 import org.apache.parquet.crypto.DecryptionPropertiesFactory;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
-import org.apache.parquet.hadoop.util.HadoopCodecs;
 
 import java.util.Map;
 
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
-import static 
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
-
 public class HadoopReadOptions extends ParquetReadOptions {
   private final Configuration conf;
 
-  private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
-
   private HadoopReadOptions(boolean useSignedStringMinMax,
                             boolean useStatsFilter,
                             boolean useDictionaryFilter,
@@ -65,7 +53,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
     super(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter, useColumnIndexFilter,
         usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, 
recordFilter, metadataFilter, codecFactory, allocator,
-        maxAllocationSize, properties, fileDecryptionProperties
+        maxAllocationSize, properties, fileDecryptionProperties, new 
HadoopParquetConfiguration(conf)
     );
     this.conf = conf;
   }
@@ -100,24 +88,9 @@ public class HadoopReadOptions extends ParquetReadOptions {
     }
 
     public Builder(Configuration conf, Path filePath) {
+      super(new HadoopParquetConfiguration(conf));
       this.conf = conf;
       this.filePath = filePath;
-      
useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", 
false));
-      useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
-      useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
-      useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
-      useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, 
true));
-      usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
-        usePageChecksumVerification));
-      useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true));
-      useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, 
false));
-      withCodecFactory(HadoopCodecs.newFactory(conf, 0));
-      withRecordFilter(getFilter(conf));
-      withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
-      String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
-      if (badRecordThresh != null) {
-        set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
-      }
     }
 
     @Override
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index dc130ee8d..8e93dc4ad 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -22,6 +22,8 @@ package org.apache.parquet;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -34,9 +36,21 @@ import java.util.Optional;
 import java.util.Set;
 
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+import static 
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
 
 // Internal use only
 public class ParquetReadOptions {
+
+  private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
+
   private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
@@ -61,6 +75,7 @@ public class ParquetReadOptions {
   private final int maxAllocationSize;
   private final Map<String, String> properties;
   private final FileDecryptionProperties fileDecryptionProperties;
+  private final ParquetConfiguration conf;
 
   ParquetReadOptions(boolean useSignedStringMinMax,
                      boolean useStatsFilter,
@@ -77,6 +92,28 @@ public class ParquetReadOptions {
                      int maxAllocationSize,
                      Map<String, String> properties,
                      FileDecryptionProperties fileDecryptionProperties) {
+    this(useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter, useColumnIndexFilter,
+      usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, 
recordFilter, metadataFilter,
+      codecFactory, allocator, maxAllocationSize, properties, 
fileDecryptionProperties,
+      new HadoopParquetConfiguration());
+  }
+
+  ParquetReadOptions(boolean useSignedStringMinMax,
+                     boolean useStatsFilter,
+                     boolean useDictionaryFilter,
+                     boolean useRecordFilter,
+                     boolean useColumnIndexFilter,
+                     boolean usePageChecksumVerification,
+                     boolean useBloomFilter,
+                     boolean useOffHeapDecryptBuffer,
+                     FilterCompat.Filter recordFilter,
+                     ParquetMetadataConverter.MetadataFilter metadataFilter,
+                     CompressionCodecFactory codecFactory,
+                     ByteBufferAllocator allocator,
+                     int maxAllocationSize,
+                     Map<String, String> properties,
+                     FileDecryptionProperties fileDecryptionProperties,
+                     ParquetConfiguration conf) {
     this.useSignedStringMinMax = useSignedStringMinMax;
     this.useStatsFilter = useStatsFilter;
     this.useDictionaryFilter = useDictionaryFilter;
@@ -92,6 +129,7 @@ public class ParquetReadOptions {
     this.maxAllocationSize = maxAllocationSize;
     this.properties = Collections.unmodifiableMap(properties);
     this.fileDecryptionProperties = fileDecryptionProperties;
+    this.conf = conf;
   }
 
   public boolean useSignedStringMinMax() {
@@ -164,10 +202,18 @@ public class ParquetReadOptions {
         : defaultValue;
   }
 
+  public ParquetConfiguration getConfiguration() {
+    return conf;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
 
+  public static Builder builder(ParquetConfiguration conf) {
+    return new Builder(conf);
+  }
+
   public static class Builder {
     protected boolean useSignedStringMinMax = false;
     protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
@@ -185,6 +231,31 @@ public class ParquetReadOptions {
     protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT;
     protected Map<String, String> properties = new HashMap<>();
     protected FileDecryptionProperties fileDecryptionProperties = null;
+    protected ParquetConfiguration conf;
+
+    public Builder() {
+      this(new HadoopParquetConfiguration());
+    }
+
+    public Builder(ParquetConfiguration conf) {
+      this.conf = conf;
+      
useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", 
false));
+      useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
+      useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
+      useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
+      useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, 
true));
+      usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
+        usePageChecksumVerification));
+      useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true));
+      useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, 
false));
+      withCodecFactory(HadoopCodecs.newFactory(conf, 0));
+      withRecordFilter(getFilter(conf));
+      withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
+      String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
+      if (badRecordThresh != null) {
+        set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
+      }
+    }
 
     public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
       this.useSignedStringMinMax = useSignedStringMinMax;
@@ -325,6 +396,7 @@ public class ParquetReadOptions {
       withAllocator(options.allocator);
       withPageChecksumVerification(options.usePageChecksumVerification);
       withDecryption(options.fileDecryptionProperties);
+      conf = options.conf;
       for (Map.Entry<String, String> keyValue : options.properties.entrySet()) 
{
         set(keyValue.getKey(), keyValue.getValue());
       }
@@ -333,13 +405,17 @@ public class ParquetReadOptions {
 
     public ParquetReadOptions build() {
       if (codecFactory == null) {
-        codecFactory = HadoopCodecs.newFactory(0);
+        if (conf == null) {
+          codecFactory = HadoopCodecs.newFactory(0);
+        } else {
+          codecFactory = HadoopCodecs.newFactory(conf, 0);
+        }
       }
 
       return new ParquetReadOptions(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, 
useRecordFilter,
         useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, 
useOffHeapDecryptBuffer, recordFilter, metadataFilter,
-        codecFactory, allocator, maxAllocationSize, properties, 
fileDecryptionProperties);
+        codecFactory, allocator, maxAllocationSize, properties, 
fileDecryptionProperties, conf);
     }
   }
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java
new file mode 100644
index 000000000..26fce1e9b
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Implementation of the Parquet configuration interface relying on Hadoop's
+ * Configuration to aid with interoperability and backwards compatibility.
+ */
+public class HadoopParquetConfiguration implements ParquetConfiguration {
+
+  private final Configuration configuration;
+
+  public HadoopParquetConfiguration() {
+    this(true);
+  }
+
+  public HadoopParquetConfiguration(boolean loadDefaults) {
+    configuration = new Configuration(loadDefaults);
+  }
+
+  public HadoopParquetConfiguration(Configuration conf) {
+    configuration = conf;
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  @Override
+  public void set(String name, String value) {
+    configuration.set(name, value);
+  }
+
+  @Override
+  public void setLong(String name, long value) {
+    configuration.setLong(name, value);
+  }
+
+  @Override
+  public void setInt(String name, int value) {
+    configuration.setInt(name, value);
+  }
+
+  @Override
+  public void setBoolean(String name, boolean value) {
+    configuration.setBoolean(name, value);
+  }
+
+  @Override
+  public void setStrings(String name, String... values) {
+    configuration.setStrings(name, values);
+  }
+
+  @Override
+  public void setClass(String name, Class<?> value, Class<?> xface) {
+    configuration.setClass(name, value, xface);
+  }
+
+  @Override
+  public String get(String name) {
+    return configuration.get(name);
+  }
+
+  @Override
+  public String get(String name, String defaultValue) {
+    return configuration.get(name, defaultValue);
+  }
+
+  @Override
+  public long getLong(String name, long defaultValue) {
+    return configuration.getLong(name, defaultValue);
+  }
+
+  @Override
+  public int getInt(String name, int defaultValue) {
+    return configuration.getInt(name, defaultValue);
+  }
+
+  @Override
+  public boolean getBoolean(String name, boolean defaultValue) {
+    return configuration.getBoolean(name, defaultValue);
+  }
+
+  @Override
+  public String getTrimmed(String name) {
+    return configuration.getTrimmed(name);
+  }
+
+  @Override
+  public String getTrimmed(String name, String defaultValue) {
+    return configuration.getTrimmed(name, defaultValue);
+  }
+
+  @Override
+  public String[] getStrings(String name, String[] defaultValue) {
+    return configuration.getStrings(name, defaultValue);
+  }
+
+  @Override
+  public Class<?> getClass(String name, Class<?> defaultValue) {
+    return configuration.getClass(name, defaultValue);
+  }
+
+  @Override
+  public <U> Class<? extends U> getClass(String name, Class<? extends U> 
defaultValue, Class<U> xface) {
+    return configuration.getClass(name, defaultValue, xface);
+  }
+
+  @Override
+  public Class<?> getClassByName(String name) throws ClassNotFoundException {
+    return configuration.getClassByName(name);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, String>> iterator() {
+    return configuration.iterator();
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index d93b4071c..a30c57aeb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -37,8 +37,11 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.codec.ZstandardCodec;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
 
 public class CodecFactory implements CompressionCodecFactory {
 
@@ -48,7 +51,7 @@ public class CodecFactory implements CompressionCodecFactory {
   private final Map<CompressionCodecName, BytesCompressor> compressors = new 
HashMap<>();
   private final Map<CompressionCodecName, BytesDecompressor> decompressors = 
new HashMap<>();
 
-  protected final Configuration configuration;
+  protected final ParquetConfiguration configuration;
   protected final int pageSize;
 
   /**
@@ -61,6 +64,19 @@ public class CodecFactory implements CompressionCodecFactory 
{
    *                 decompressors this parameter has no impact on the 
function of the factory
    */
   public CodecFactory(Configuration configuration, int pageSize) {
+    this(new HadoopParquetConfiguration(configuration), pageSize);
+  }
+
+  /**
+   * Create a new codec factory.
+   *
+   * @param configuration used to pass compression codec configuration 
information
+   * @param pageSize the expected page size, does not set a hard limit, 
currently just
+   *                 used to set the initial size of the output stream used 
when
+   *                 compressing a buffer. If this factory is only used to 
construct
+   *                 decompressors this parameter has no impact on the 
function of the factory
+   */
+  public CodecFactory(ParquetConfiguration configuration, int pageSize) {
     this.configuration = configuration;
     this.pageSize = pageSize;
   }
@@ -246,9 +262,9 @@ public class CodecFactory implements 
CompressionCodecFactory {
         codecClass = Class.forName(codecClassName);
       } catch (ClassNotFoundException e) {
         // Try to load the class using the job classloader
-        codecClass = configuration.getClassLoader().loadClass(codecClassName);
+        codecClass = new 
Configuration(false).getClassLoader().loadClass(codecClassName);
       }
-      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 
configuration);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 
ConfigurationUtil.createHadoopConfiguration(configuration));
       CODEC_BY_NAME.put(codecCacheKey, codec);
       return codec;
     } catch (ClassNotFoundException e) {
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
index 5454a69b2..588f93c89 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
@@ -24,6 +24,8 @@ import com.github.luben.zstd.Zstd;
 import com.github.luben.zstd.ZstdOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.codec.ZstdDecompressorStream;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -50,6 +52,10 @@ import static 
org.apache.parquet.hadoop.codec.ZstandardCodec.PARQUET_COMPRESS_ZS
 class DirectZstd {
 
   static CodecFactory.BytesCompressor createCompressor(Configuration conf, int 
pageSize) {
+    return createCompressor(new HadoopParquetConfiguration(conf), pageSize);
+  }
+
+  static CodecFactory.BytesCompressor createCompressor(ParquetConfiguration 
conf, int pageSize) {
     return new ZstdCompressor(
       getPool(conf),
       conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, 
DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
@@ -58,6 +64,10 @@ class DirectZstd {
   }
 
   static CodecFactory.BytesDecompressor createDecompressor(Configuration conf) 
{
+    return createDecompressor(new HadoopParquetConfiguration(conf));
+  }
+
+  static CodecFactory.BytesDecompressor 
createDecompressor(ParquetConfiguration conf) {
     return new ZstdDecompressor(getPool(conf));
   }
 
@@ -133,7 +143,7 @@ class DirectZstd {
     }
   }
 
-  private static BufferPool getPool(Configuration conf) {
+  private static BufferPool getPool(ParquetConfiguration conf) {
     if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, 
DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) {
       return RecyclingBufferPool.INSTANCE;
     } else {
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 8203e9098..36da819fa 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.PrimitiveIterator;
 import java.util.Set;
@@ -29,9 +30,9 @@ import java.util.stream.LongStream;
 
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -167,10 +168,7 @@ class InternalParquetRecordReader<T> {
 
   public void initialize(ParquetFileReader reader, ParquetReadOptions options) 
{
     // copy custom configuration to the Configuration passed to the ReadSupport
-    Configuration conf = new Configuration();
-    if (options instanceof HadoopReadOptions) {
-      conf = ((HadoopReadOptions) options).getConf();
-    }
+    ParquetConfiguration conf = 
Objects.requireNonNull(options).getConfiguration();
     for (String property : options.getPropertyNames()) {
       conf.set(property, options.getProperty(property));
     }
@@ -261,7 +259,7 @@ class InternalParquetRecordReader<T> {
 
         LOG.debug("read value: {}", currentValue);
       } catch (RuntimeException e) {
-        throw new ParquetDecodingException(format("Can not read value at %d in 
block %d in file %s", current, currentBlock, reader.getPath()), e);
+        throw new ParquetDecodingException(format("Can not read value at %d in 
block %d in file %s", current, currentBlock, reader.getFile()), e);
       }
     }
     return true;
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 1bfd4b20f..7d355af78 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -193,18 +195,13 @@ public class ParquetInputFormat<T> extends 
FileInputFormat<Void, T> {
     return ConfigurationUtil.getClassFromConfig(configuration, 
UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
   }
 
-  private static UnboundRecordFilter 
getUnboundRecordFilterInstance(Configuration configuration) {
+  private static UnboundRecordFilter 
getUnboundRecordFilterInstance(ParquetConfiguration configuration) {
     Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, 
UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
-    if (clazz == null) { return null; }
-
+    if (clazz == null) {
+      return null;
+    }
     try {
-      UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) 
clazz.newInstance();
-
-      if (unboundRecordFilter instanceof Configurable) {
-        ((Configurable)unboundRecordFilter).setConf(configuration);
-      }
-
-      return unboundRecordFilter;
+      return (UnboundRecordFilter) clazz.newInstance();
     } catch (InstantiationException | IllegalAccessException e) {
       throw new BadConfigurationException(
           "could not instantiate unbound record filter class", e);
@@ -232,6 +229,10 @@ public class ParquetInputFormat<T> extends 
FileInputFormat<Void, T> {
   }
 
   private static FilterPredicate getFilterPredicate(Configuration 
configuration) {
+    return getFilterPredicate(new HadoopParquetConfiguration(configuration));
+  }
+
+  private static FilterPredicate getFilterPredicate(ParquetConfiguration 
configuration) {
     try {
       return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, 
configuration);
     } catch (IOException e) {
@@ -247,6 +248,10 @@ public class ParquetInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @return a filter for the unbound record filter specified in conf
    */
   public static Filter getFilter(Configuration conf) {
+    return getFilter(new HadoopParquetConfiguration(conf));
+  }
+
+  public static Filter getFilter(ParquetConfiguration conf) {
     return FilterCompat.get(getFilterPredicate(conf), 
getUnboundRecordFilterInstance(conf));
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index f9c8314dd..785e5d05d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -36,11 +36,14 @@ import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
@@ -180,6 +183,10 @@ public class ParquetReader<T> implements Closeable {
     return new Builder<>(file);
   }
 
+  public static <T> Builder<T> read(InputFile file, ParquetConfiguration conf) 
throws IOException {
+    return new Builder<>(file, conf);
+  }
+
   public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
     return new Builder<>(readSupport, path);
   }
@@ -190,7 +197,7 @@ public class ParquetReader<T> implements Closeable {
     private final Path path;
     private Filter filter = null;
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
-    protected Configuration conf;
+    protected ParquetConfiguration conf;
     private ParquetReadOptions.Builder optionsBuilder;
 
     @Deprecated
@@ -198,8 +205,9 @@ public class ParquetReader<T> implements Closeable {
       this.readSupport = Objects.requireNonNull(readSupport, "readSupport 
cannot be null");
       this.file = null;
       this.path = Objects.requireNonNull(path, "path cannot be null");
-      this.conf = new Configuration();
-      this.optionsBuilder = HadoopReadOptions.builder(conf, path);
+      Configuration hadoopConf = new Configuration();
+      this.conf = new HadoopParquetConfiguration(hadoopConf);
+      this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
     }
 
     @Deprecated
@@ -207,8 +215,9 @@ public class ParquetReader<T> implements Closeable {
       this.readSupport = null;
       this.file = null;
       this.path = Objects.requireNonNull(path, "path cannot be null");
-      this.conf = new Configuration();
-      this.optionsBuilder = HadoopReadOptions.builder(conf, path);
+      Configuration hadoopConf = new Configuration();
+      this.conf = new HadoopParquetConfiguration(hadoopConf);
+      this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
     }
 
     protected Builder(InputFile file) {
@@ -217,17 +226,30 @@ public class ParquetReader<T> implements Closeable {
       this.path = null;
       if (file instanceof HadoopInputFile) {
         HadoopInputFile hadoopFile = (HadoopInputFile) file;
-        this.conf = hadoopFile.getConfiguration();
-        optionsBuilder = HadoopReadOptions.builder(conf, hadoopFile.getPath());
+        Configuration hadoopConf = hadoopFile.getConfiguration();
+        this.conf = new HadoopParquetConfiguration(hadoopConf);
+        optionsBuilder = HadoopReadOptions.builder(hadoopConf, 
hadoopFile.getPath());
       } else {
-        this.conf = new Configuration();
-        optionsBuilder = HadoopReadOptions.builder(conf);
+        optionsBuilder = ParquetReadOptions.builder(new 
HadoopParquetConfiguration());
+      }
+    }
+
+    protected Builder(InputFile file, ParquetConfiguration conf) {
+      this.readSupport = null;
+      this.file = Objects.requireNonNull(file, "file cannot be null");
+      this.path = null;
+      this.conf = conf;
+      if (file instanceof HadoopInputFile) {
+        HadoopInputFile hadoopFile = (HadoopInputFile) file;
+        optionsBuilder = 
HadoopReadOptions.builder(ConfigurationUtil.createHadoopConfiguration(conf), 
hadoopFile.getPath());
+      } else {
+        optionsBuilder = ParquetReadOptions.builder(conf);
       }
     }
 
     // when called, resets options to the defaults from conf
     public Builder<T> withConf(Configuration conf) {
-      this.conf = Objects.requireNonNull(conf, "conf cannot be null");
+      this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf, 
"conf cannot be null"));
 
       // previous versions didn't use the builder, so may set filter before 
conf. this maintains
       // compatibility for filter. other options are reset by a new conf.
@@ -239,6 +261,15 @@ public class ParquetReader<T> implements Closeable {
       return this;
     }
 
+    public Builder<T> withConf(ParquetConfiguration conf) {
+      this.conf = conf;
+      this.optionsBuilder = ParquetReadOptions.builder(conf);
+      if (filter != null) {
+        optionsBuilder.withRecordFilter(filter);
+      }
+      return this;
+    }
+
     public Builder<T> withFilter(Filter filter) {
       this.filter = filter;
       optionsBuilder.withRecordFilter(filter);
@@ -354,19 +385,20 @@ public class ParquetReader<T> implements Closeable {
           .build();
 
       if (path != null) {
-        FileSystem fs = path.getFileSystem(conf);
+        Configuration hadoopConf = 
ConfigurationUtil.createHadoopConfiguration(conf);
+        FileSystem fs = path.getFileSystem(hadoopConf);
         FileStatus stat = fs.getFileStatus(path);
 
         if (stat.isFile()) {
           return new ParquetReader<>(
-              Collections.singletonList((InputFile) 
HadoopInputFile.fromStatus(stat, conf)),
+              Collections.singletonList((InputFile) 
HadoopInputFile.fromStatus(stat, hadoopConf)),
               options,
               getReadSupport());
 
         } else {
           List<InputFile> files = new ArrayList<>();
           for (FileStatus fileStatus : fs.listStatus(path, 
HiddenFileFilter.INSTANCE)) {
-            files.add(HadoopInputFile.fromStatus(fileStatus, conf));
+            files.add(HadoopInputFile.fromStatus(fileStatus, hadoopConf));
           }
           return new ParquetReader<T>(files, options, getReadSupport());
         }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 7b78a9376..2e888722e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -26,11 +26,14 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.schema.MessageType;
@@ -276,6 +279,30 @@ public class ParquetWriter<T> implements Closeable {
       int maxPaddingSize,
       ParquetProperties encodingProps,
       FileEncryptionProperties encryptionProperties) throws IOException {
+    this(
+      file,
+      mode,
+      writeSupport,
+      compressionCodecName,
+      rowGroupSize,
+      validating,
+      new HadoopParquetConfiguration(conf),
+      maxPaddingSize,
+      encodingProps,
+      encryptionProperties);
+  }
+
+  ParquetWriter(
+      OutputFile file,
+      ParquetFileWriter.Mode mode,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      long rowGroupSize,
+      boolean validating,
+      ParquetConfiguration conf,
+      int maxPaddingSize,
+      ParquetProperties encodingProps,
+      FileEncryptionProperties encryptionProperties) throws IOException {
 
     WriteSupport.WriteContext writeContext = writeSupport.init(conf);
     MessageType schema = writeContext.getSchema();
@@ -283,8 +310,9 @@ public class ParquetWriter<T> implements Closeable {
     // encryptionProperties could be built from the implementation of 
EncryptionPropertiesFactory when it is attached.
     if (encryptionProperties == null) {
       String path = file == null ? null : file.getPath();
-      encryptionProperties = 
ParquetOutputFormat.createEncryptionProperties(conf,
-          path == null ? null : new Path(path), writeContext);
+      Configuration hadoopConf = 
ConfigurationUtil.createHadoopConfiguration(conf);
+      encryptionProperties = ParquetOutputFormat.createEncryptionProperties(
+        hadoopConf, path == null ? null : new Path(path), writeContext);
     }
 
     ParquetFileWriter fileWriter = new ParquetFileWriter(
@@ -353,7 +381,7 @@ public class ParquetWriter<T> implements Closeable {
     private OutputFile file = null;
     private Path path = null;
     private FileEncryptionProperties encryptionProperties = null;
-    private Configuration conf = new Configuration();
+    private ParquetConfiguration conf = null;
     private ParquetFileWriter.Mode mode;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
     private long rowGroupSize = DEFAULT_BLOCK_SIZE;
@@ -381,6 +409,14 @@ public class ParquetWriter<T> implements Closeable {
      */
     protected abstract WriteSupport<T> getWriteSupport(Configuration conf);
 
+    /**
+     * @param conf a configuration
+     * @return an appropriate WriteSupport for the object model.
+     */
+    protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+      throw new UnsupportedOperationException("Override 
ParquetWriter$Builder#getWriteSupport(ParquetConfiguration)");
+    }
+
     /**
      * Set the {@link Configuration} used by the constructed writer.
      *
@@ -388,6 +424,17 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF withConf(Configuration conf) {
+      this.conf = new HadoopParquetConfiguration(conf);
+      return self();
+    }
+
+    /**
+     * Set the {@link ParquetConfiguration} used by the constructed writer.
+     *
+     * @param conf a {@code ParquetConfiguration}
+     * @return this builder for method chaining.
+     */
+    public SELF withConf(ParquetConfiguration conf) {
       this.conf = conf;
       return self();
     }
@@ -719,6 +766,9 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF config(String property, String value) {
+      if (conf == null) {
+        conf = new HadoopParquetConfiguration();
+      }
       conf.set(property, value);
       return self();
     }
@@ -730,12 +780,15 @@ public class ParquetWriter<T> implements Closeable {
      * @throws IOException if there is an error while creating the writer
      */
     public ParquetWriter<T> build() throws IOException {
+      if (conf == null) {
+        conf = new HadoopParquetConfiguration();
+      }
       if (file != null) {
         return new ParquetWriter<>(file,
             mode, getWriteSupport(conf), codecName, rowGroupSize, 
enableValidation, conf,
             maxPaddingSize, encodingPropsBuilder.build(), 
encryptionProperties);
       } else {
-        return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
+        return new ParquetWriter<>(HadoopOutputFile.fromPath(path, 
ConfigurationUtil.createHadoopConfiguration(conf)),
             mode, getWriteSupport(conf), codecName,
             rowGroupSize, enableValidation, conf, maxPaddingSize,
             encodingPropsBuilder.build(), encryptionProperties);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
index 8100a351f..2593393d7 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 
@@ -53,6 +54,15 @@ public class DelegatingReadSupport<T> extends ReadSupport<T> 
{
     return delegate.prepareForRead(configuration, keyValueMetaData, 
fileSchema, readContext);
   }
 
+  @Override
+  public RecordMaterializer<T> prepareForRead(
+      ParquetConfiguration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadSupport.ReadContext readContext) {
+    return delegate.prepareForRead(configuration, keyValueMetaData, 
fileSchema, readContext);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getName() + "(" + delegate.toString() + ")";
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
index f5bbfc60d..926fe68c6 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
@@ -20,6 +20,7 @@ package org.apache.parquet.hadoop.api;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.io.api.RecordConsumer;
 
 /**
@@ -42,6 +43,11 @@ public class DelegatingWriteSupport<T> extends 
WriteSupport<T> {
     return delegate.init(configuration);
   }
 
+  @Override
+  public WriteSupport.WriteContext init(ParquetConfiguration configuration) {
+    return delegate.init(configuration);
+  }
+
   @Override
   public void prepareForWrite(RecordConsumer recordConsumer) {
     delegate.prepareForWrite(recordConsumer);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
index 6bc5e5d3d..3d80811aa 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
@@ -25,6 +25,9 @@ import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -35,7 +38,7 @@ public class InitContext {
 
   private final Map<String,Set<String>> keyValueMetadata;
   private Map<String,String> mergedKeyValueMetadata;
-  private final Configuration configuration;
+  private final ParquetConfiguration configuration;
   private final MessageType fileSchema;
 
   /**
@@ -47,6 +50,13 @@ public class InitContext {
       Configuration configuration,
       Map<String, Set<String>> keyValueMetadata,
       MessageType fileSchema) {
+    this(new HadoopParquetConfiguration(configuration), keyValueMetadata, 
fileSchema);
+  }
+
+  public InitContext(
+      ParquetConfiguration configuration,
+      Map<String, Set<String>> keyValueMetadata,
+      MessageType fileSchema) {
     super();
     this.keyValueMetadata = keyValueMetadata;
     this.configuration = configuration;
@@ -77,6 +87,13 @@ public class InitContext {
    * @return the configuration for this job
    */
   public Configuration getConfiguration() {
+    return ConfigurationUtil.createHadoopConfiguration(configuration);
+  }
+
+  /**
+   * @return the Parquet configuration for this job
+   */
+  public ParquetConfiguration getParquetConfiguration() {
     return configuration;
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
index 62344522b..a3dfe2caa 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -69,10 +70,28 @@ abstract public class ReadSupport<T> {
    */
   @Deprecated
   public ReadContext init(
-          Configuration configuration,
-          Map<String, String> keyValueMetaData,
-          MessageType fileSchema) {
-    throw new UnsupportedOperationException("Override init(InitContext)");
+      Configuration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema) {
+    throw new UnsupportedOperationException("Override 
ReadSupport.init(InitContext)");
+  }
+
+  /**
+   * called in {@link 
org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
 in the front end
+   *
+   * @param configuration    the configuration
+   * @param keyValueMetaData the app specific metadata from the file
+   * @param fileSchema       the schema of the file
+   * @return the readContext that defines how to read the file
+   *
+   * @deprecated override {@link ReadSupport#init(InitContext)} instead
+   */
+  @Deprecated
+  public ReadContext init(
+      ParquetConfiguration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema) {
+    throw new UnsupportedOperationException("Override 
ReadSupport.init(InitContext)");
   }
 
   /**
@@ -82,7 +101,7 @@ abstract public class ReadSupport<T> {
    * @return the readContext that defines how to read the file
    */
   public ReadContext init(InitContext context) {
-    return init(context.getConfiguration(), 
context.getMergedKeyValueMetaData(), context.getFileSchema());
+    return init(context.getParquetConfiguration(), 
context.getMergedKeyValueMetaData(), context.getFileSchema());
   }
 
   /**
@@ -101,6 +120,24 @@ abstract public class ReadSupport<T> {
           MessageType fileSchema,
           ReadContext readContext);
 
+  /**
+   * called in {@link 
org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit,
 org.apache.hadoop.mapreduce.TaskAttemptContext)} in the back end
+   * the returned RecordMaterializer will materialize the records and add them 
to the destination
+   *
+   * @param configuration    the configuration
+   * @param keyValueMetaData the app specific metadata from the file
+   * @param fileSchema       the schema of the file
+   * @param readContext      returned by the init method
+   * @return the recordMaterializer that will materialize the records
+   */
+  public RecordMaterializer<T> prepareForRead(
+      ParquetConfiguration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadContext readContext) {
+    throw new UnsupportedOperationException("Override 
ReadSupport.prepareForRead(ParquetConfiguration, Map<String, String>, 
MessageType, ReadContext)");
+  }
+
   /**
    * information to read the file
    */
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
index 9549d5f49..b73e102c2 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
 
@@ -105,6 +106,15 @@ abstract public class WriteSupport<T> {
    */
   public abstract WriteContext init(Configuration configuration);
 
+  /**
+   * called first in the task
+   * @param configuration the job's configuration
+   * @return the information needed to write the file
+   */
+  public WriteContext init(ParquetConfiguration configuration) {
+    throw new UnsupportedOperationException("Override 
WriteSupport#init(ParquetConfiguration)");
+  }
+
   /**
    * This will be called once per row group
    * @param recordConsumer the recordConsumer to write to
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
index 12a67d301..a151b4fce 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.hadoop.example;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -111,6 +112,11 @@ public class ExampleParquetWriter extends 
ParquetWriter<Group> {
 
     @Override
     protected WriteSupport<Group> getWriteSupport(Configuration conf) {
+      return getWriteSupport((ParquetConfiguration) null);
+    }
+
+    @Override
+    protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
       return new GroupWriteSupport(type, extraMetaData);
     }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
index c49b681d5..6cb4b6f7f 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
@@ -22,6 +22,8 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
 import org.apache.parquet.hadoop.api.ReadSupport;
@@ -34,6 +36,13 @@ public class GroupReadSupport extends ReadSupport<Group> {
   public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(
       Configuration configuration, Map<String, String> keyValueMetaData,
       MessageType fileSchema) {
+    return init(new HadoopParquetConfiguration(configuration), 
keyValueMetaData, fileSchema);
+  }
+
+  @Override
+  public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(
+      ParquetConfiguration configuration, Map<String, String> keyValueMetaData,
+      MessageType fileSchema) {
     String partialSchemaString = 
configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
     MessageType requestedProjection = getSchemaForRead(fileSchema, 
partialSchemaString);
     return new ReadContext(requestedProjection);
@@ -46,4 +55,11 @@ public class GroupReadSupport extends ReadSupport<Group> {
     return new GroupRecordConverter(readContext.getRequestedSchema());
   }
 
+  @Override
+  public RecordMaterializer<Group> prepareForRead(ParquetConfiguration 
configuration,
+                                                  Map<String, String> 
keyValueMetaData, MessageType fileSchema,
+                                                  
org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+    return new GroupRecordConverter(readContext.getRequestedSchema());
+  }
+
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
index dfed676c9..a4d4a3f36 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
@@ -26,6 +26,8 @@ import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.GroupWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -41,6 +43,10 @@ public class GroupWriteSupport extends WriteSupport<Group> {
   }
 
   public static MessageType getSchema(Configuration configuration) {
+    return getSchema(new HadoopParquetConfiguration(configuration));
+  }
+
+  public static MessageType getSchema(ParquetConfiguration configuration) {
     return 
parseMessageType(Objects.requireNonNull(configuration.get(PARQUET_EXAMPLE_SCHEMA),
 PARQUET_EXAMPLE_SCHEMA));
   }
 
@@ -68,6 +74,11 @@ public class GroupWriteSupport extends WriteSupport<Group> {
 
   @Override
   public org.apache.parquet.hadoop.api.WriteSupport.WriteContext 
init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @Override
+  public org.apache.parquet.hadoop.api.WriteSupport.WriteContext 
init(ParquetConfiguration configuration) {
     // if present, prefer the schema passed to the constructor
     if (schema == null) {
       schema = getSchema(configuration);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
index 7f39cd76c..ca524d90b 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
@@ -19,18 +19,26 @@
 package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.BadConfigurationException;
 
+import java.util.Map;
+
 public class ConfigurationUtil {
 
   public static Class<?> getClassFromConfig(Configuration configuration, 
String configName, Class<?> assignableFrom) {
+    return getClassFromConfig(new HadoopParquetConfiguration(configuration), 
configName, assignableFrom);
+  }
+
+  public static Class<?> getClassFromConfig(ParquetConfiguration 
configuration, String configName, Class<?> assignableFrom) {
     final String className = configuration.get(configName);
     if (className == null) {
       return null;
     }
-    
+
     try {
-      final Class<?> foundClass = configuration.getClassByName(className);     
+      final Class<?> foundClass = configuration.getClassByName(className);
       if (!assignableFrom.isAssignableFrom(foundClass)) {
         throw new BadConfigurationException("class " + className + " set in 
job conf at "
                 + configName + " is not a subclass of " + 
assignableFrom.getCanonicalName());
@@ -41,4 +49,18 @@ public class ConfigurationUtil {
     }
   }
 
+  public static Configuration createHadoopConfiguration(ParquetConfiguration 
conf) {
+    if (conf == null) {
+      return new Configuration();
+    }
+    if (conf instanceof HadoopParquetConfiguration) {
+      return ((HadoopParquetConfiguration) conf).getConfiguration();
+    }
+    Configuration configuration = new Configuration();
+    for (Map.Entry<String, String> entry : conf) {
+      configuration.set(entry.getKey(), entry.getValue());
+    }
+    return configuration;
+  }
+
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
index a46c8db21..845cafc5a 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
@@ -22,6 +22,7 @@ package org.apache.parquet.hadoop.util;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.CodecFactory;
 
 public class HadoopCodecs {
@@ -33,6 +34,10 @@ public class HadoopCodecs {
     return new CodecFactory(conf, sizeHint);
   }
 
+  public static CompressionCodecFactory newFactory(ParquetConfiguration conf, 
int sizeHint) {
+    return new CodecFactory(conf, sizeHint);
+  }
+
   public static CompressionCodecFactory newDirectFactory(Configuration conf, 
ByteBufferAllocator allocator, int sizeHint) {
     return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint);
   }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
index 199b774c4..6e669ed8b 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
@@ -29,6 +29,8 @@ import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 
 /**
  * Serialization utils copied from:
@@ -70,8 +72,22 @@ public final class SerializationUtil {
    * @return the read object, or null if key is not present in conf
    * @throws IOException if there is an error while reading
    */
-  @SuppressWarnings("unchecked")
   public static <T> T readObjectFromConfAsBase64(String key, Configuration 
conf) throws IOException {
+    return readObjectFromConfAsBase64(key, new 
HadoopParquetConfiguration(conf));
+  }
+
+  /**
+   * Reads an object (that was written using
+   * {@link #writeObjectToConfAsBase64}) from a configuration
+   *
+   * @param key for the configuration
+   * @param conf to read from
+   * @param <T> the Java type of the deserialized object
+   * @return the read object, or null if key is not present in conf
+   * @throws IOException if there is an error while reading
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T readObjectFromConfAsBase64(String key, 
ParquetConfiguration conf) throws IOException {
     String b64 = conf.get(key);
     if (b64 == null) {
       return null;
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
index 074d2e8b6..e8e032c9c 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -86,6 +87,11 @@ public class DirectWriterTest {
 
     @Override
     public WriteContext init(Configuration configuration) {
+      return init((ParquetConfiguration) null);
+    }
+
+    @Override
+    public WriteContext init(ParquetConfiguration configuration) {
       return new WriteContext(type, metadata);
     }
 
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
index 862ae672c..a7c200283 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
@@ -22,6 +22,8 @@ package org.apache.parquet.crypto.propertiesfactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.crypto.EncryptionPropertiesFactory;
 import org.apache.parquet.crypto.ParquetCipher;
 import org.apache.parquet.example.data.Group;
@@ -203,6 +205,11 @@ public class SchemaControlEncryptionTest {
 
     @Override
     public WriteContext init(Configuration conf) {
+      return init(new HadoopParquetConfiguration(conf));
+    }
+
+    @Override
+    public WriteContext init(ParquetConfiguration conf) {
       WriteContext writeContext = super.init(conf);
       MessageType schema = writeContext.getSchema();
       List<ColumnDescriptor> columns = schema.getColumns();
@@ -219,6 +226,10 @@ public class SchemaControlEncryptionTest {
     }
 
     private void setMetadata(ColumnDescriptor column, Configuration conf) {
+      setMetadata(column, new HadoopParquetConfiguration(conf));
+    }
+
+    private void setMetadata(ColumnDescriptor column, ParquetConfiguration 
conf) {
       String columnShortName = column.getPath()[column.getPath().length - 1];
       if (cryptoMetadata.containsKey(columnShortName) &&
         cryptoMetadata.get(columnShortName).get("columnKeyMetaData") != null) {
@@ -242,6 +253,11 @@ public class SchemaControlEncryptionTest {
 
     @Override
     protected WriteSupport<Group> getWriteSupport(Configuration conf) {
+      return getWriteSupport((ParquetConfiguration) null);
+    }
+
+    @Override
+    protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
       return new CryptoGroupWriteSupport();
     }
   }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
index e3ddb3e4c..d64e8d975 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
@@ -330,7 +330,7 @@ public class TestEncryptionOptions {
     Path rootPath = new Path(temporaryFolder.getRoot().getPath());
     LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", 
rootPath.toString());
     byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
-    // Write using various encryption configuraions
+    // Write using various encryption configurations
     testWriteEncryptedParquetFiles(rootPath, DATA);
     // Read using various decryption configurations.
     testReadEncryptedParquetFiles(rootPath, DATA);
diff --git 
a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java 
b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
index 50f9ebcc3..cca1a91f2 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -61,6 +63,14 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
    * @return the pig schema requested by the user or null if none.
    */
   static Schema getPigSchema(Configuration configuration) {
+    return getPigSchema(new HadoopParquetConfiguration(configuration));
+  }
+
+  /**
+   * @param configuration the configuration
+   * @return the pig schema requested by the user or null if none.
+   */
+  static Schema getPigSchema(ParquetConfiguration configuration) {
     return parsePigSchema(configuration.get(PARQUET_PIG_SCHEMA));
   }
 
@@ -69,9 +79,17 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
    * @return List of required fields from pushProjection
    */
   static RequiredFieldList getRequiredFields(Configuration configuration) {
+    return getRequiredFields(new HadoopParquetConfiguration(configuration));
+  }
+
+  /**
+   * @param configuration configuration
+   * @return List of required fields from pushProjection
+   */
+  static RequiredFieldList getRequiredFields(ParquetConfiguration 
configuration) {
     String requiredFieldString = 
configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
 
-    if(requiredFieldString == null) {
+    if (requiredFieldString == null) {
       return null;
     }
 
@@ -154,9 +172,9 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
 
   @Override
   public ReadContext init(InitContext initContext) {
-    Schema pigSchema = getPigSchema(initContext.getConfiguration());
-    RequiredFieldList requiredFields = 
getRequiredFields(initContext.getConfiguration());
-    boolean columnIndexAccess = 
initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+    Schema pigSchema = getPigSchema(initContext.getParquetConfiguration());
+    RequiredFieldList requiredFields = 
getRequiredFields(initContext.getParquetConfiguration());
+    boolean columnIndexAccess = 
initContext.getParquetConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, 
false);
 
     if (pigSchema == null) {
       return new ReadContext(initContext.getFileSchema());
@@ -174,9 +192,17 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
       Map<String, String> keyValueMetaData,
       MessageType fileSchema,
       ReadContext readContext) {
+    return prepareForRead(new HadoopParquetConfiguration(configuration), 
keyValueMetaData, fileSchema, readContext);
+  }
+
+  @Override
+  public RecordMaterializer<Tuple> prepareForRead(
+      ParquetConfiguration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadContext readContext) {
     MessageType requestedSchema = readContext.getRequestedSchema();
     Schema requestedPigSchema = getPigSchema(configuration);
-
     if (requestedPigSchema == null) {
       throw new ParquetDecodingException("Missing Pig schema: ParquetLoader 
sets the schema in the job conf");
     }
diff --git 
a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java 
b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
index 68a7d7d22..fd1bb39cd 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
@@ -26,6 +26,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -82,6 +84,11 @@ public class TupleWriteSupport extends WriteSupport<Tuple> {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
     Map<String, String> extraMetaData = new HashMap<String, String>();
     new PigMetaData(rootPigSchema).addToMetaData(extraMetaData);
     return new WriteContext(rootSchema, extraMetaData);
diff --git 
a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java 
b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
index ff4bd87d6..1c21044cb 100644
--- 
a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
+++ 
b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
@@ -33,6 +33,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -175,7 +176,7 @@ public class TestTupleRecordConsumer {
 
   private <T> TupleWriteSupport newTupleWriter(String pigSchemaString, 
RecordMaterializer<T> recordConsumer) throws ParserException {
     TupleWriteSupport tupleWriter = 
TupleWriteSupport.fromPigSchema(pigSchemaString);
-    tupleWriter.init(null);
+    tupleWriter.init((ParquetConfiguration) null);
     tupleWriter.prepareForWrite(
         new ConverterConsumer(recordConsumer.getRootConverter(), 
tupleWriter.getParquetSchema())
         );
diff --git 
a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java 
b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
index c8e36aded..12c3373f5 100644
--- 
a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
+++ 
b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.logging.Level;
 
 import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.NonSpillableDataBag;
@@ -130,8 +131,8 @@ public class TupleConsumerPerfTest {
     TupleReadSupport tupleReadSupport = new TupleReadSupport();
     Map<String, String> pigMetaData = pigMetaData(pigSchemaString);
     MessageType schema = new 
PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchemaString));
-    ReadContext init = tupleReadSupport.init(null, pigMetaData, schema);
-    RecordMaterializer<Tuple> recordConsumer = 
tupleReadSupport.prepareForRead(null, pigMetaData, schema, init);
+    ReadContext init = tupleReadSupport.init((ParquetConfiguration) null, 
pigMetaData, schema);
+    RecordMaterializer<Tuple> recordConsumer = 
tupleReadSupport.prepareForRead((ParquetConfiguration) null, pigMetaData, 
schema, init);
     RecordReader<Tuple> recordReader = columnIO.getRecordReader(columns, 
recordConsumer);
     // TODO: put this back
 //  if (DEBUG) {
@@ -156,7 +157,7 @@ public class TupleConsumerPerfTest {
   private static void write(MemPageStore memPageStore, ColumnWriteStoreV1 
columns, MessageType schema, String pigSchemaString) throws ExecException, 
ParserException {
     MessageColumnIO columnIO = newColumnFactory(pigSchemaString);
     TupleWriteSupport tupleWriter = 
TupleWriteSupport.fromPigSchema(pigSchemaString);
-    tupleWriter.init(null);
+    tupleWriter.init((ParquetConfiguration) null);
     tupleWriter.prepareForWrite(columnIO.getRecordWriter(columns));
     write(memPageStore, tupleWriter, 10000);
     write(memPageStore, tupleWriter, 10000);
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 8383fbc75..da51788f2 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -25,6 +25,8 @@ import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.BadConfigurationException;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -68,7 +70,7 @@ class ProtoMessageConverter extends GroupConverter {
     }
   };
 
-  protected final Configuration conf;
+  protected final ParquetConfiguration conf;
   protected final Converter[] converters;
   protected final ParentValueContainer parent;
   protected final Message.Builder myBuilder;
@@ -88,8 +90,16 @@ class ProtoMessageConverter extends GroupConverter {
     this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, 
extraMetadata);
   }
 
+  ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc, 
Class<? extends Message> protoClass, GroupType parquetSchema, Map<String, 
String> extraMetadata) {
+    this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, 
extraMetadata);
+  }
+
   // For usage in message arrays
   ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, 
Message.Builder builder, GroupType parquetSchema, Map<String, String> 
extraMetadata) {
+    this(new HadoopParquetConfiguration(conf), pvc, builder, parquetSchema, 
extraMetadata);
+  }
+
+  ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc, 
Message.Builder builder, GroupType parquetSchema, Map<String, String> 
extraMetadata) {
     if (pvc == null) {
       throw new IllegalStateException("Missing parent value container");
     }
@@ -141,7 +151,12 @@ class ProtoMessageConverter extends GroupConverter {
   private Converter dummyScalarConverter(ParentValueContainer pvc,
                                          Type parquetField, Configuration conf,
                                          Map<String, String> extraMetadata) {
+    return dummyScalarConverter(pvc, parquetField, new 
HadoopParquetConfiguration(conf), extraMetadata);
+  }
 
+  private Converter dummyScalarConverter(ParentValueContainer pvc,
+                                         Type parquetField, 
ParquetConfiguration conf,
+                                         Map<String, String> extraMetadata) {
     if (parquetField.isPrimitive()) {
       PrimitiveType primitiveType = parquetField.asPrimitiveType();
       PrimitiveType.PrimitiveTypeName primitiveTypeName = 
primitiveType.getPrimitiveTypeName();
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
index a85b4ef55..f1bd64466 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.proto;
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -120,8 +121,15 @@ public class ProtoParquetWriter<T extends 
MessageOrBuilder> extends ParquetWrite
                        return this;
                }
 
-               protected WriteSupport<T> getWriteSupport(Configuration conf) {
-                   return (WriteSupport<T>) 
ProtoParquetWriter.writeSupport(protoMessage);
-               }
+    @Override
+    protected WriteSupport<T> getWriteSupport(Configuration conf) {
+      return getWriteSupport((ParquetConfiguration) null);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+      return (WriteSupport<T>) ProtoParquetWriter.writeSupport(protoMessage);
+    }
        }
 }
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
index 78edf70d2..6343992e5 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
@@ -21,6 +21,8 @@ package org.apache.parquet.proto;
 import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.api.InitContext;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.io.api.RecordMaterializer;
@@ -59,7 +61,7 @@ public class ProtoReadSupport<T extends Message> extends 
ReadSupport<T> {
 
   @Override
   public ReadContext init(InitContext context) {
-    String requestedProjectionString = 
context.getConfiguration().get(PB_REQUESTED_PROJECTION);
+    String requestedProjectionString = 
context.getParquetConfiguration().get(PB_REQUESTED_PROJECTION);
 
     if (requestedProjectionString != null && 
!requestedProjectionString.trim().isEmpty()) {
       MessageType requestedProjection = 
getSchemaForRead(context.getFileSchema(), requestedProjectionString);
@@ -74,6 +76,11 @@ public class ProtoReadSupport<T extends Message> extends 
ReadSupport<T> {
 
   @Override
   public RecordMaterializer<T> prepareForRead(Configuration configuration, 
Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext 
readContext) {
+    return prepareForRead(new HadoopParquetConfiguration(configuration), 
keyValueMetaData, fileSchema, readContext);
+  }
+
+  @Override
+  public RecordMaterializer<T> prepareForRead(ParquetConfiguration 
configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, 
ReadContext readContext) {
     String headerProtoClass = keyValueMetaData.get(PB_CLASS);
     String configuredProtoClass = configuration.get(PB_CLASS);
 
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
index 75a67f12c..4ddf23d6e 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
@@ -22,6 +22,7 @@ package org.apache.parquet.proto;
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.Collections;
@@ -54,6 +55,11 @@ public class ProtoRecordConverter<T extends 
MessageOrBuilder> extends ProtoMessa
     reusedBuilder = getBuilder();
   }
 
+  public ProtoRecordConverter(ParquetConfiguration conf, Class<? extends 
Message> protoclass, MessageType parquetSchema, Map<String, String> 
extraMetadata) {
+    super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, 
extraMetadata);
+    reusedBuilder = getBuilder();
+  }
+
   public ProtoRecordConverter(Configuration conf, Message.Builder builder, 
MessageType parquetSchema, Map<String, String> extraMetadata) {
     super(conf, new SkipParentValueContainer(), builder, parquetSchema, 
extraMetadata);
     reusedBuilder = getBuilder();
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
index dd77ca6b6..63640d330 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
@@ -21,6 +21,8 @@ package org.apache.parquet.proto;
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
@@ -32,6 +34,10 @@ class ProtoRecordMaterializer<T extends MessageOrBuilder> 
extends RecordMaterial
   private final ProtoRecordConverter<T> root;
 
   public ProtoRecordMaterializer(Configuration conf, MessageType 
requestedSchema, Class<? extends Message> protobufClass, Map<String, String> 
metadata) {
+    this(new HadoopParquetConfiguration(conf), requestedSchema, protobufClass, 
metadata);
+  }
+
+  public ProtoRecordMaterializer(ParquetConfiguration conf, MessageType 
requestedSchema, Class<? extends Message> protobufClass, Map<String, String> 
metadata) {
     this.root = new ProtoRecordConverter<T>(conf, protobufClass, 
requestedSchema, metadata);
   }
 
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
index c3570323f..a6a779d07 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
@@ -25,6 +25,8 @@ import 
com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -81,9 +83,19 @@ public class ProtoSchemaConverter {
    * Instantiate a schema converter to get the parquet schema corresponding to 
protobuf classes.
    * Returns instances that are not specs compliant and limited to 5 levels of 
recursion depth.
    *
-   * @param config   Hadoop configuration object to parrse 
parquetSpecsCompliant and maxRecursion settings.
+   * @param config   Hadoop configuration object to parse 
parquetSpecsCompliant and maxRecursion settings.
    */
   public ProtoSchemaConverter(Configuration config) {
+    this(new HadoopParquetConfiguration(config));
+  }
+
+  /**
+   * Instantiate a schema converter to get the parquet schema corresponding to 
protobuf classes.
+   * Returns instances that are not specs compliant and limited to 5 levels of 
recursion depth.
+   *
+   * @param config   Parquet configuration object to parse 
parquetSpecsCompliant and maxRecursion settings.
+   */
+  public ProtoSchemaConverter(ParquetConfiguration config) {
     this(
         config.getBoolean(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, false),
         config.getInt(PB_MAX_RECURSION, 5));
diff --git 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index f15511062..b13acd2a5 100644
--- 
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++ 
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -23,6 +23,8 @@ import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.hadoop.BadConfigurationException;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.InvalidRecordException;
@@ -118,6 +120,11 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> 
extends WriteSupport<
 
   @Override
   public WriteContext init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
 
     Map<String, String> extraMetaData = new HashMap<>();
 
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
index 0c3fe440d..baa3ddb92 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
@@ -18,6 +18,8 @@ package org.apache.parquet.hadoop.thrift;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 
 import com.twitter.elephantbird.pig.util.ThriftToPig;
@@ -40,14 +42,22 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
   public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractThriftWriteSupport.class);
-  private static Configuration conf;
+  private static ParquetConfiguration conf;
 
   public static void setGenericThriftClass(Configuration configuration, 
Class<?> thriftClass) {
+    setGenericThriftClass(new HadoopParquetConfiguration(configuration), 
thriftClass);
+  }
+
+  public static void setGenericThriftClass(ParquetConfiguration configuration, 
Class<?> thriftClass) {
     conf = configuration;
     configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
   }
 
-  public static Class getGenericThriftClass(Configuration configuration) {
+  public static Class<?> getGenericThriftClass(Configuration configuration) {
+    return getGenericThriftClass(new 
HadoopParquetConfiguration(configuration));
+  }
+
+  public static Class<?> getGenericThriftClass(ParquetConfiguration 
configuration) {
     final String thriftClassName = configuration.get(PARQUET_THRIFT_CLASS);
     if (thriftClassName == null) {
       throw new BadConfigurationException("the thrift class conf is missing in 
job conf at " + PARQUET_THRIFT_CLASS);
@@ -111,9 +121,14 @@ public abstract class AbstractThriftWriteSupport<T> 
extends WriteSupport<T> {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
     conf = configuration;
     if (writeContext == null) {
-      init(getGenericThriftClass(configuration));
+      init((Class<T>) getGenericThriftClass(configuration));
     }
     return writeContext;
   }
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
index c1ece9fcf..60dfc12e7 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
@@ -16,6 +16,8 @@
 package org.apache.parquet.hadoop.thrift;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 
@@ -25,14 +27,22 @@ import 
org.apache.parquet.thrift.struct.ThriftType.StructType;
 
 public class TBaseWriteSupport<T extends TBase<?, ?>> extends 
AbstractThriftWriteSupport<T> {
 
-  private static Configuration conf;
+  private static ParquetConfiguration conf;
 
   public static <U extends TBase<?,?>> void setThriftClass(Configuration 
configuration, Class<U> thriftClass) {
+    setThriftClass(new HadoopParquetConfiguration(configuration), thriftClass);
+  }
+
+  public static <U extends TBase<?,?>> void 
setThriftClass(ParquetConfiguration configuration, Class<U> thriftClass) {
     conf = configuration;
     AbstractThriftWriteSupport.setGenericThriftClass(configuration, 
thriftClass);
   }
 
   public static Class<? extends TBase<?,?>> getThriftClass(Configuration 
configuration) {
+    return getThriftClass(new HadoopParquetConfiguration(configuration));
+  }
+
+  public static Class<? extends TBase<?,?>> 
getThriftClass(ParquetConfiguration configuration) {
     return (Class<? extends 
TBase<?,?>>)AbstractThriftWriteSupport.getGenericThriftClass(configuration);
   }
 
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
index 6b9d75d98..6c9311f4e 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -57,6 +59,10 @@ public class ThriftBytesWriteSupport extends 
WriteSupport<BytesWritable> {
   }
 
   public static Class<TProtocolFactory> getTProtocolFactoryClass(Configuration 
conf) {
+    return getTProtocolFactoryClass(new HadoopParquetConfiguration(conf));
+  }
+
+  public static Class<TProtocolFactory> 
getTProtocolFactoryClass(ParquetConfiguration conf) {
     final String tProtocolClassName = conf.get(PARQUET_PROTOCOL_CLASS);
     if (tProtocolClassName == null) {
       throw new BadConfigurationException("the protocol class conf is missing 
in job conf at " + PARQUET_PROTOCOL_CLASS);
@@ -80,7 +86,7 @@ public class ThriftBytesWriteSupport extends 
WriteSupport<BytesWritable> {
   private StructType thriftStruct;
   private ParquetWriteProtocol parquetWriteProtocol;
   private final FieldIgnoredHandler errorHandler;
-  private Configuration configuration;
+  private ParquetConfiguration configuration;
 
   public ThriftBytesWriteSupport() {
     this.buffered = true;
@@ -106,6 +112,15 @@ public class ThriftBytesWriteSupport extends 
WriteSupport<BytesWritable> {
       Class<? extends TBase<?, ?>> thriftClass,
       boolean buffered,
       FieldIgnoredHandler errorHandler) {
+    this(new HadoopParquetConfiguration(configuration), protocolFactory, 
thriftClass, buffered, errorHandler);
+  }
+
+  public ThriftBytesWriteSupport(
+      ParquetConfiguration configuration,
+      TProtocolFactory protocolFactory,
+      Class<? extends TBase<?, ?>> thriftClass,
+      boolean buffered,
+      FieldIgnoredHandler errorHandler) {
     super();
     this.configuration = configuration;
     this.protocolFactory = protocolFactory;
@@ -124,6 +139,11 @@ public class ThriftBytesWriteSupport extends 
WriteSupport<BytesWritable> {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
     this.configuration = configuration;
     if (this.protocolFactory == null) {
       try {
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 2375a6df6..bd9530b82 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -25,6 +25,8 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.protocol.TProtocol;
 
@@ -111,6 +113,10 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
   }
 
   public static FieldProjectionFilter getFieldProjectionFilter(Configuration 
conf) {
+    return getFieldProjectionFilter(new HadoopParquetConfiguration(conf));
+  }
+
+  public static FieldProjectionFilter 
getFieldProjectionFilter(ParquetConfiguration conf) {
     String deprecated = conf.get(THRIFT_COLUMN_FILTER_KEY);
     String strict = conf.get(STRICT_THRIFT_COLUMN_FILTER_KEY);
 
@@ -155,7 +161,7 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
 
   @Override
   public org.apache.parquet.hadoop.api.ReadSupport.ReadContext 
init(InitContext context) {
-    final Configuration configuration = context.getConfiguration();
+    final ParquetConfiguration configuration = 
context.getParquetConfiguration();
     final MessageType fileMessageType = context.getFileSchema();
     MessageType requestedProjection = fileMessageType;
     String partialSchemaString = 
configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
@@ -185,9 +191,14 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
     return new ReadContext(schemaForRead);
   }
 
-  @SuppressWarnings("unchecked")
   protected MessageType getProjectedSchema(Configuration configuration, 
FieldProjectionFilter
       fieldProjectionFilter) {
+    return getProjectedSchema(new HadoopParquetConfiguration(configuration), 
fieldProjectionFilter);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected MessageType getProjectedSchema(ParquetConfiguration configuration, 
FieldProjectionFilter
+      fieldProjectionFilter) {
     return new ThriftSchemaConverter(configuration, fieldProjectionFilter)
         .convert((Class<TBase<?, ?>>)thriftClass);
   }
@@ -200,8 +211,12 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
       .convert((Class<TBase<?, ?>>)thriftClass);
   }
 
-  @SuppressWarnings("unchecked")
   private void initThriftClassFromMultipleFiles(Map<String, Set<String>> 
fileMetadata, Configuration conf) throws ClassNotFoundException {
+    initThriftClassFromMultipleFiles(fileMetadata, new 
HadoopParquetConfiguration(conf));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initThriftClassFromMultipleFiles(Map<String, Set<String>> 
fileMetadata, ParquetConfiguration conf) throws ClassNotFoundException {
     if (thriftClass != null) {
       return;
     }
@@ -216,8 +231,12 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
     thriftClass = (Class<T>)Class.forName(className);
   }
 
-  @SuppressWarnings("unchecked")
   private void initThriftClass(ThriftMetaData metadata, Configuration conf) 
throws ClassNotFoundException {
+    initThriftClass(metadata, new HadoopParquetConfiguration(conf));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initThriftClass(ThriftMetaData metadata, ParquetConfiguration 
conf) throws ClassNotFoundException {
     if (thriftClass != null) {
       return;
     }
@@ -254,10 +273,45 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
         configuration);
   }
 
-  @SuppressWarnings("unchecked")
+  @Override
+  public RecordMaterializer<T> prepareForRead(ParquetConfiguration 
configuration,
+                                              Map<String, String> 
keyValueMetaData, MessageType fileSchema,
+                                              
org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+    ThriftMetaData thriftMetaData = 
ThriftMetaData.fromExtraMetaData(keyValueMetaData);
+    try {
+      initThriftClass(thriftMetaData, configuration);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Cannot find Thrift object class for 
metadata: " + thriftMetaData, e);
+    }
+
+    // if there was not metadata in the file, get it from requested class
+    if (thriftMetaData == null) {
+      thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass);
+    }
+
+    String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, 
RECORD_CONVERTER_DEFAULT);
+    return getRecordConverterInstance(converterClassName, thriftClass,
+      readContext.getRequestedSchema(), thriftMetaData.getDescriptor(),
+      configuration);
+  }
+
   private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
       String converterClassName, Class<T> thriftClass,
       MessageType requestedSchema, StructType descriptor, Configuration conf) {
+    return getRecordConverterInstance(converterClassName, thriftClass, 
requestedSchema, descriptor, conf, Configuration.class);
+  }
+
+  private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
+      String converterClassName, Class<T> thriftClass,
+      MessageType requestedSchema, StructType descriptor, ParquetConfiguration 
conf) {
+    return getRecordConverterInstance(converterClassName, thriftClass, 
requestedSchema, descriptor, conf, ParquetConfiguration.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T, CONF> ThriftRecordConverter<T> getRecordConverterInstance(
+    String converterClassName, Class<T> thriftClass,
+    MessageType requestedSchema, StructType descriptor, CONF conf, Class<CONF> 
confClass) {
+
     Class<ThriftRecordConverter<T>> converterClass;
     try {
       converterClass = (Class<ThriftRecordConverter<T>>) 
Class.forName(converterClassName);
@@ -269,14 +323,14 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
       // first try the new version that accepts a Configuration
       try {
         Constructor<ThriftRecordConverter<T>> constructor =
-            converterClass.getConstructor(Class.class, MessageType.class, 
StructType.class, Configuration.class);
+          converterClass.getConstructor(Class.class, MessageType.class, 
StructType.class, confClass);
         return constructor.newInstance(thriftClass, requestedSchema, 
descriptor, conf);
       } catch (IllegalAccessException | NoSuchMethodException e) {
         // try the other constructor pattern
       }
 
       Constructor<ThriftRecordConverter<T>> constructor =
-          converterClass.getConstructor(Class.class, MessageType.class, 
StructType.class);
+        converterClass.getConstructor(Class.class, MessageType.class, 
StructType.class);
       return constructor.newInstance(thriftClass, requestedSchema, descriptor);
     } catch (InstantiationException | InvocationTargetException e) {
       throw new RuntimeException("Failed to construct Thrift converter class: 
" + converterClassName, e);
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
index a9864ff81..2ac4fccf1 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.hadoop.thrift;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -68,6 +69,11 @@ public class ThriftWriteSupport<T extends TBase<?,?>> 
extends WriteSupport<T> {
     return this.writeSupport.init(configuration);
   }
 
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
+    return this.writeSupport.init(configuration);
+  }
+
   @Override
   public void prepareForWrite(RecordConsumer recordConsumer) {
     this.writeSupport.prepareForWrite(recordConsumer);
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
index ba48b3779..f129f36b7 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
@@ -22,6 +22,8 @@ package org.apache.parquet.thrift;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TField;
 import org.apache.thrift.protocol.TList;
@@ -500,6 +502,11 @@ public class ParquetWriteProtocol extends ParquetProtocol {
 
   public ParquetWriteProtocol(
       Configuration configuration, RecordConsumer recordConsumer, 
MessageColumnIO schema, StructType thriftType) {
+    this(new HadoopParquetConfiguration(configuration), recordConsumer, 
schema, thriftType);
+  }
+
+  public ParquetWriteProtocol(
+      ParquetConfiguration configuration, RecordConsumer recordConsumer, 
MessageColumnIO schema, StructType thriftType) {
     this.recordConsumer = recordConsumer;
     if (configuration != null) {
       this.writeThreeLevelList = configuration.getBoolean(
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
index 78fc4a88f..fa31a5c78 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
@@ -19,6 +19,8 @@
 package org.apache.parquet.thrift;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocol;
@@ -38,10 +40,15 @@ public class TBaseRecordConverter<T extends TBase<?,?>> 
extends ThriftRecordConv
    */
   @Deprecated
   public TBaseRecordConverter(final Class<T> thriftClass, MessageType 
requestedParquetSchema, StructType thriftType) {
-    this(thriftClass, requestedParquetSchema, thriftType, null);
+    this(thriftClass, requestedParquetSchema, thriftType, 
(HadoopParquetConfiguration) null);
   }
 
+  @SuppressWarnings("unused")
   public TBaseRecordConverter(final Class<T> thriftClass, MessageType 
requestedParquetSchema, StructType thriftType, Configuration conf) {
+    this(thriftClass, requestedParquetSchema, thriftType, new 
HadoopParquetConfiguration(conf));
+  }
+
+  public TBaseRecordConverter(final Class<T> thriftClass, MessageType 
requestedParquetSchema, StructType thriftType, ParquetConfiguration conf) {
     super(new ThriftReader<T>() {
       @Override
       public T readOneRecord(TProtocol protocol) throws TException {
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
index 3244b3211..d0649212f 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TField;
 import org.apache.thrift.protocol.TList;
@@ -843,7 +845,7 @@ public class ThriftRecordConverter<T> extends 
RecordMaterializer<T> {
    */
   @Deprecated
   public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, 
MessageType requestedParquetSchema, ThriftType.StructType thriftType) {
-    this(thriftReader, name, requestedParquetSchema, thriftType, null);
+    this(thriftReader, name, requestedParquetSchema, thriftType, 
(ParquetConfiguration) null);
   }
 
   /**
@@ -855,6 +857,17 @@ public class ThriftRecordConverter<T> extends 
RecordMaterializer<T> {
    * @param conf a Configuration
    */
   public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, 
MessageType requestedParquetSchema, ThriftType.StructType thriftType, 
Configuration conf) {
+    this(thriftReader, name, requestedParquetSchema, thriftType, new 
HadoopParquetConfiguration(conf));
+  }
+
+  /**
+   * @param thriftReader the class responsible for instantiating the final 
object and read from the protocol
+   * @param name the name of that type ( the thrift class simple name)
+   * @param requestedParquetSchema the schema for the incoming columnar events
+   * @param thriftType the thrift type descriptor
+   * @param conf a Configuration
+   */
+  public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, 
MessageType requestedParquetSchema, ThriftType.StructType thriftType, 
ParquetConfiguration conf) {
     super();
     this.thriftReader = thriftReader;
     this.protocol = new ParquetReadProtocol();
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index f915a6ed1..c32df8147 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -24,6 +24,8 @@ import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ShouldNeverHappenException;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
@@ -80,6 +82,11 @@ class ThriftSchemaConvertVisitor implements 
ThriftType.StateVisitor<ConvertedFie
 
   private ThriftSchemaConvertVisitor(FieldProjectionFilter 
fieldProjectionFilter, boolean doProjection,
                                      boolean keepOneOfEachUnion, Configuration 
configuration) {
+    this(fieldProjectionFilter, doProjection, keepOneOfEachUnion, new 
HadoopParquetConfiguration(configuration));
+  }
+
+  private ThriftSchemaConvertVisitor(FieldProjectionFilter 
fieldProjectionFilter, boolean doProjection,
+                                     boolean keepOneOfEachUnion, 
ParquetConfiguration configuration) {
     this.fieldProjectionFilter = Objects.requireNonNull(fieldProjectionFilter,
       "fieldProjectionFilter cannot be null");
     this.doProjection = doProjection;
@@ -105,6 +112,11 @@ class ThriftSchemaConvertVisitor implements 
ThriftType.StateVisitor<ConvertedFie
 
   public static MessageType convert(StructType struct, FieldProjectionFilter 
filter, boolean keepOneOfEachUnion,
                                     Configuration conf) {
+    return convert(struct, filter, keepOneOfEachUnion, new 
HadoopParquetConfiguration(conf));
+  }
+
+  public static MessageType convert(StructType struct, FieldProjectionFilter 
filter, boolean keepOneOfEachUnion,
+                                    ParquetConfiguration conf) {
     State state = new State(new FieldsPath(), REPEATED, "ParquetSchema");
 
     ConvertedField converted = struct.accept(
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
index 61cfd4c1b..662450217 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TEnum;
 import org.apache.thrift.TUnion;
@@ -52,19 +54,28 @@ import static 
org.apache.parquet.schema.Type.Repetition.REPEATED;
 public class ThriftSchemaConverter {
   private final FieldProjectionFilter fieldProjectionFilter;
 
-  private Configuration conf;
+  private ParquetConfiguration conf;
 
   public ThriftSchemaConverter() {
     this(FieldProjectionFilter.ALL_COLUMNS);
   }
 
   public ThriftSchemaConverter(Configuration configuration) {
+    this(new HadoopParquetConfiguration(configuration));
+  }
+
+  public ThriftSchemaConverter(ParquetConfiguration configuration) {
     this();
     conf = configuration;
   }
 
   public ThriftSchemaConverter(
       Configuration configuration, FieldProjectionFilter 
fieldProjectionFilter) {
+    this(new HadoopParquetConfiguration(configuration), fieldProjectionFilter);
+  }
+
+  public ThriftSchemaConverter(
+      ParquetConfiguration configuration, FieldProjectionFilter 
fieldProjectionFilter) {
     this(fieldProjectionFilter);
     conf = configuration;
   }
diff --git 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
index d75206124..f582e7c1a 100644
--- 
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
+++ 
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
@@ -19,6 +19,8 @@
 package org.apache.parquet.thrift.pig;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.pig.data.Tuple;
 import org.apache.thrift.TBase;
 
@@ -51,9 +53,14 @@ public class TupleToThriftWriteSupport extends 
WriteSupport<Tuple> {
     return "thrift";
   }
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
   @Override
   public WriteContext init(Configuration configuration) {
+    return init(new HadoopParquetConfiguration(configuration));
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public WriteContext init(ParquetConfiguration configuration) {
     try {
       Class<?> clazz = 
configuration.getClassByName(className).asSubclass(TBase.class);
       thriftWriteSupport = new ThriftWriteSupport(clazz);
diff --git 
a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
 
b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
index 1311d7690..98f22d12a 100644
--- 
a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
+++ 
b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 import com.twitter.elephantbird.thrift.test.TestMapInList;
 import com.twitter.elephantbird.thrift.test.TestNameSet;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
 import org.junit.ComparisonFailure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -708,7 +709,7 @@ public class TestParquetWriteProtocol {
     MessageType schema = new PigSchemaConverter().convert(pigSchema);
     LOG.info("{}", schema);
     TupleWriteSupport tupleWriteSupport = new TupleWriteSupport(pigSchema);
-    tupleWriteSupport.init(null);
+    tupleWriteSupport.init((ParquetConfiguration) null);
     tupleWriteSupport.prepareForWrite(recordConsumer);
     final Tuple pigTuple = thriftToPig.getPigTuple(a);
     LOG.info("{}", pigTuple);
diff --git a/pom.xml b/pom.xml
index 7fe6bf28f..0f8228689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -540,6 +540,8 @@
             </excludeModules>
             <excludes>
               <exclude>${shade.prefix}</exclude>
+              <exclude>org.apache.parquet.hadoop.CodecFactory</exclude> <!-- 
change field type from Configuration to ParquetConfiguration -->
+              <exclude>org.apache.parquet.hadoop.ParquetReader</exclude> <!-- 
change field type from Configuration to ParquetConfiguration -->
               
<exclude>org.apache.parquet.thrift.projection.deprecated.PathGlobPattern</exclude>
               <!-- japicmp is overly aggressive on interface types in 
signatures, a type was changed to a supertype but this still triggers it -->
               
<exclude>org.apache.parquet.hadoop.ColumnChunkPageWriteStore</exclude>


Reply via email to