This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 1b08469cf PARQUET-2347: Add interface layer between Parquet and Hadoop
Configuration (#1141)
1b08469cf is described below
commit 1b08469cf9287bac218210abd76ed696682c257d
Author: Atour <[email protected]>
AuthorDate: Mon Oct 30 02:53:07 2023 +0100
PARQUET-2347: Add interface layer between Parquet and Hadoop Configuration
(#1141)
---
.../parquet/avro/AvroParquetOutputFormat.java | 2 -
.../org/apache/parquet/avro/AvroParquetReader.java | 24 +++
.../org/apache/parquet/avro/AvroParquetWriter.java | 13 ++
.../org/apache/parquet/avro/AvroReadSupport.java | 21 ++-
.../apache/parquet/avro/AvroSchemaConverter.java | 6 +
.../org/apache/parquet/avro/AvroWriteSupport.java | 12 +-
.../org/apache/parquet/avro/TestReadWrite.java | 62 +++++--
.../parquet/avro/TestReadWriteOldListBehavior.java | 7 +
.../apache/parquet/conf/ParquetConfiguration.java | 178 +++++++++++++++++++++
.../java/org/apache/parquet/util/Reflection.java | 50 ++++++
.../java/org/apache/parquet/HadoopReadOptions.java | 33 +---
.../org/apache/parquet/ParquetReadOptions.java | 80 ++++++++-
.../parquet/conf/HadoopParquetConfiguration.java | 140 ++++++++++++++++
.../org/apache/parquet/hadoop/CodecFactory.java | 22 ++-
.../java/org/apache/parquet/hadoop/DirectZstd.java | 12 +-
.../hadoop/InternalParquetRecordReader.java | 10 +-
.../apache/parquet/hadoop/ParquetInputFormat.java | 25 +--
.../org/apache/parquet/hadoop/ParquetReader.java | 58 +++++--
.../org/apache/parquet/hadoop/ParquetWriter.java | 61 ++++++-
.../parquet/hadoop/api/DelegatingReadSupport.java | 10 ++
.../parquet/hadoop/api/DelegatingWriteSupport.java | 6 +
.../org/apache/parquet/hadoop/api/InitContext.java | 19 ++-
.../org/apache/parquet/hadoop/api/ReadSupport.java | 47 +++++-
.../apache/parquet/hadoop/api/WriteSupport.java | 10 ++
.../hadoop/example/ExampleParquetWriter.java | 6 +
.../parquet/hadoop/example/GroupReadSupport.java | 16 ++
.../parquet/hadoop/example/GroupWriteSupport.java | 11 ++
.../parquet/hadoop/util/ConfigurationUtil.java | 26 ++-
.../apache/parquet/hadoop/util/HadoopCodecs.java | 5 +
.../parquet/hadoop/util/SerializationUtil.java | 18 ++-
.../java/org/apache/parquet/DirectWriterTest.java | 6 +
.../SchemaControlEncryptionTest.java | 16 ++
.../parquet/hadoop/TestEncryptionOptions.java | 2 +-
.../org/apache/parquet/pig/TupleReadSupport.java | 36 ++++-
.../org/apache/parquet/pig/TupleWriteSupport.java | 7 +
.../parquet/pig/TestTupleRecordConsumer.java | 3 +-
.../apache/parquet/pig/TupleConsumerPerfTest.java | 7 +-
.../parquet/proto/ProtoMessageConverter.java | 17 +-
.../apache/parquet/proto/ProtoParquetWriter.java | 14 +-
.../org/apache/parquet/proto/ProtoReadSupport.java | 9 +-
.../apache/parquet/proto/ProtoRecordConverter.java | 6 +
.../parquet/proto/ProtoRecordMaterializer.java | 6 +
.../apache/parquet/proto/ProtoSchemaConverter.java | 14 +-
.../apache/parquet/proto/ProtoWriteSupport.java | 7 +
.../hadoop/thrift/AbstractThriftWriteSupport.java | 21 ++-
.../parquet/hadoop/thrift/TBaseWriteSupport.java | 12 +-
.../hadoop/thrift/ThriftBytesWriteSupport.java | 22 ++-
.../parquet/hadoop/thrift/ThriftReadSupport.java | 68 +++++++-
.../parquet/hadoop/thrift/ThriftWriteSupport.java | 6 +
.../parquet/thrift/ParquetWriteProtocol.java | 7 +
.../parquet/thrift/TBaseRecordConverter.java | 9 +-
.../parquet/thrift/ThriftRecordConverter.java | 15 +-
.../parquet/thrift/ThriftSchemaConvertVisitor.java | 12 ++
.../parquet/thrift/ThriftSchemaConverter.java | 13 +-
.../thrift/pig/TupleToThriftWriteSupport.java | 9 +-
.../parquet/thrift/TestParquetWriteProtocol.java | 3 +-
pom.xml | 2 +
57 files changed, 1206 insertions(+), 133 deletions(-)
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
index afbaefcb0..9195925f7 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
@@ -19,9 +19,7 @@
package org.apache.parquet.avro;
import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
index 8970b66ea..3c98948b6 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
@@ -26,6 +26,7 @@ import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
@@ -53,6 +54,10 @@ public class AvroParquetReader<T> extends ParquetReader<T> {
return new Builder<T>(file);
}
+ public static <T> Builder<T> builder(InputFile file, ParquetConfiguration
conf) {
+ return new Builder<T>(file, conf);
+ }
+
/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
@@ -67,6 +72,21 @@ public class AvroParquetReader<T> extends ParquetReader<T> {
return new
Builder<GenericRecord>(file).withDataModel(GenericData.get()).build();
}
+ /**
+ * Convenience method for creating a ParquetReader which uses Avro
+ * {@link GenericData} objects to store data from reads.
+ *
+ * @param file The location to read data from
+ * @param conf The configuration to use
+ * @return A {@code ParquetReader} which reads data as Avro
+ * {@code GenericData}
+ * @throws IOException if the InputFile has been closed, or if some other I/O
+ * error occurs
+ */
+ public static ParquetReader<GenericRecord> genericRecordReader(InputFile
file, ParquetConfiguration conf) throws IOException {
+ return new Builder<GenericRecord>(file,
conf).withDataModel(GenericData.get()).build();
+ }
+
/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
@@ -143,6 +163,10 @@ public class AvroParquetReader<T> extends ParquetReader<T>
{
super(file);
}
+ private Builder(InputFile file, ParquetConfiguration conf) {
+ super(file, conf);
+ }
+
public Builder<T> withDataModel(GenericData model) {
this.model = model;
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index 94d8167b0..0d87d007f 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -25,6 +25,8 @@ import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -154,6 +156,12 @@ public class AvroParquetWriter<T> extends ParquetWriter<T>
{
private static <T> WriteSupport<T> writeSupport(Configuration conf,
Schema avroSchema,
GenericData model) {
+ return writeSupport(new HadoopParquetConfiguration(conf), avroSchema,
model);
+ }
+
+ private static <T> WriteSupport<T> writeSupport(ParquetConfiguration conf,
+ Schema avroSchema,
+ GenericData model) {
return new AvroWriteSupport<T>(
new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model);
}
@@ -189,5 +197,10 @@ public class AvroParquetWriter<T> extends ParquetWriter<T>
{
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return AvroParquetWriter.writeSupport(conf, schema, model);
}
+
+ @Override
+ protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+ return AvroParquetWriter.writeSupport(conf, schema, model);
+ }
}
}
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index 8f268a145..0bda3d02e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -24,7 +24,10 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
@@ -95,6 +98,13 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
public ReadContext init(Configuration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema) {
+ return init(new HadoopParquetConfiguration(configuration),
keyValueMetaData, fileSchema);
+ }
+
+ @Override
+ public ReadContext init(ParquetConfiguration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
MessageType projection = fileSchema;
Map<String, String> metadata = new LinkedHashMap<String, String>();
@@ -120,6 +130,13 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
public RecordMaterializer<T> prepareForRead(
Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, ReadContext readContext) {
+ return prepareForRead(new HadoopParquetConfiguration(configuration),
keyValueMetaData, fileSchema, readContext);
+ }
+
+ @Override
+ public RecordMaterializer<T> prepareForRead(
+ ParquetConfiguration configuration, Map<String, String> keyValueMetaData,
+ MessageType fileSchema, ReadContext readContext) {
Map<String, String> metadata = readContext.getReadSupportMetadata();
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema;
@@ -153,7 +170,7 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
parquetSchema, avroSchema, model);
}
- private GenericData getDataModel(Configuration conf, Schema schema) {
+ private GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
if (model != null) {
return model;
}
@@ -175,6 +192,6 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class,
AvroDataSupplier.class);
- return ReflectionUtils.newInstance(suppClass, conf).get();
+ return ReflectionUtils.newInstance(suppClass,
ConfigurationUtil.createHadoopConfiguration(conf)).get();
}
}
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 0314bcd71..abf94eaa2 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -23,6 +23,8 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -102,6 +104,10 @@ public class AvroSchemaConverter {
}
public AvroSchemaConverter(Configuration conf) {
+ this(new HadoopParquetConfiguration(conf));
+ }
+
+ public AvroSchemaConverter(ParquetConfiguration conf) {
this.assumeRepeatedIsListElement = conf.getBoolean(
ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
this.writeOldListStructure = conf.getBoolean(
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 564e74539..692e3fac0 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -34,7 +34,10 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
@@ -129,6 +132,11 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
if (rootAvroSchema == null) {
this.rootAvroSchema = new
Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
this.rootSchema = new
AvroSchemaConverter(configuration).convert(rootAvroSchema);
@@ -404,7 +412,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
return Binary.fromCharSequence(value.toString());
}
- private static GenericData getDataModel(Configuration conf, Schema schema) {
+ private static GenericData getDataModel(ParquetConfiguration conf, Schema
schema) {
if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
GenericData modelForSchema;
try {
@@ -423,7 +431,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class,
AvroDataSupplier.class);
- return ReflectionUtils.newInstance(suppClass, conf).get();
+ return ReflectionUtils.newInstance(suppClass,
ConfigurationUtil.createHadoopConfiguration(conf)).get();
}
private abstract class ListWriter {
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 81e751aba..9aaa9e3b2 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -52,6 +52,8 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
@@ -77,22 +79,31 @@ public class TestReadWrite {
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
- { false, false }, // use the new converters
- { true, false }, // use the old converters
- { false, true } }; // use a local disk location
+ { false, false, false }, // use the new converters with hadoop config
+ { true, false, false }, // use the old converters with hadoop config
+ { false, true, false }, // use a local disk location with hadoop
config
+ { false, false, true }, // use the new converters with parquet
config interface
+ { true, false, true }, // use the old converters with parquet
config interface
+ { false, true, true } }; // use a local disk location with parquet
config interface
return Arrays.asList(data);
}
private final boolean compat;
private final boolean local;
+ private final boolean confInterface;
private final Configuration testConf = new Configuration();
+ private final ParquetConfiguration parquetConf = new
HadoopParquetConfiguration(true);
- public TestReadWrite(boolean compat, boolean local) {
+ public TestReadWrite(boolean compat, boolean local, boolean confInterface) {
this.compat = compat;
this.local = local;
+ this.confInterface = confInterface;
this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
- testConf.setBoolean("parquet.avro.add-list-element-records", false);
- testConf.setBoolean("parquet.avro.write-old-list-structure", false);
+ this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
+ this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
+ this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+ this.parquetConf.setBoolean("parquet.avro.add-list-element-records",
false);
+ this.parquetConf.setBoolean("parquet.avro.write-old-list-structure",
false);
}
@Test
@@ -431,6 +442,11 @@ public class TestReadWrite {
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
return new
WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
new HashMap<String, String>());
}
@@ -864,30 +880,44 @@ public class TestReadWrite {
}
private ParquetWriter<GenericRecord> writer(String file, Schema schema)
throws IOException {
+ AvroParquetWriter.Builder<GenericRecord> writerBuilder;
if (local) {
- return AvroParquetWriter
+ writerBuilder = AvroParquetWriter
.<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
- .withSchema(schema)
- .withConf(testConf)
- .build();
+ .withSchema(schema);
} else {
- return AvroParquetWriter
+ writerBuilder = AvroParquetWriter
.<GenericRecord>builder(new Path(file))
- .withSchema(schema)
+ .withSchema(schema);
+ }
+ if (confInterface) {
+ return writerBuilder
+ .withConf(parquetConf)
+ .build();
+ } else {
+ return writerBuilder
.withConf(testConf)
.build();
}
}
private ParquetReader<GenericRecord> reader(String file) throws IOException {
+ AvroParquetReader.Builder<GenericRecord> readerBuilder;
if (local) {
- return AvroParquetReader
+ readerBuilder = AvroParquetReader
.<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
- .withDataModel(GenericData.get())
- .withConf(testConf)
+ .withDataModel(GenericData.get());
+ } else {
+ return new AvroParquetReader<>(testConf, new Path(file));
+ }
+ if (confInterface) {
+ return readerBuilder
+ .withConf(parquetConf)
.build();
} else {
- return new AvroParquetReader(testConf, new Path(file));
+ return readerBuilder
+ .withConf(testConf)
+ .build();
}
}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index f12417cae..31a221d5b 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -38,6 +38,8 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
@@ -358,6 +360,11 @@ public class TestReadWriteOldListBehavior {
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
return new
WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
new HashMap<String, String>());
}
diff --git
a/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java
b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java
new file mode 100644
index 000000000..f8aae9729
--- /dev/null
+++
b/parquet-common/src/main/java/org/apache/parquet/conf/ParquetConfiguration.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.conf;
+
+import java.util.Map;
+
+/**
+ * Configuration interface with the methods necessary to configure Parquet
applications.
+ */
+public interface ParquetConfiguration extends
Iterable<Map.Entry<String,String>> {
+
+ /**
+ * Sets the value of the name property.
+ *
+ * @param name the property to set
+ * @param value the value to set the property to
+ */
+ void set(String name, String value);
+
+ /**
+ * Sets the value of the name property to a long.
+ *
+ * @param name the property to set
+ * @param value the value to set the property to
+ */
+ void setLong(String name, long value);
+
+ /**
+ * Sets the value of the name property to an integer.
+ *
+ * @param name the property to set
+ * @param value the value to set the property to
+ */
+ void setInt(String name, int value);
+
+ /**
+ * Sets the value of the name property to a boolean.
+ *
+ * @param name the property to set
+ * @param value the value to set the property to
+ */
+ void setBoolean(String name, boolean value);
+
+ /**
+ * Sets the value of the name property to an array of comma delimited values.
+ *
+ * @param name the property to set
+ * @param value the values to set the property to
+ */
+ void setStrings(String name, String... value);
+
+ /**
+ * Sets the value of the name property to a class.
+ *
+ * @param name the property to set
+ * @param value the value to set the property to
+ * @param xface the interface implemented by the value
+ */
+ void setClass(String name, Class<?> value, Class<?> xface);
+
+ /**
+ * Gets the value of the name property. Returns null if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @return the value of the property, or null if it does not exist
+ */
+ String get(String name);
+
+ /**
+ * Gets the value of the name property. Returns the default value if no such
value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property, or the default value if it does not
exist
+ */
+ String get(String name, String defaultValue);
+
+ /**
+ * Gets the value of the name property as a long. Returns the default value
if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property as a long, or the default value if it
does not exist
+ */
+ long getLong(String name, long defaultValue);
+
+ /**
+ * Gets the value of the name property as an integer. Returns the default
value if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property as an integer, or the default value if
it does not exist
+ */
+ int getInt(String name, int defaultValue);
+
+ /**
+ * Gets the value of the name property as a boolean. Returns the default
value if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property as a boolean, or the default value if
it does not exist
+ */
+ boolean getBoolean(String name, boolean defaultValue);
+
+ /**
+ * Gets the trimmed value of the name property. Returns null if no such
value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @return the trimmed value of the property, or null if it does not exist
+ */
+ String getTrimmed(String name);
+
+ /**
+ * Gets the trimmed value of the name property as a boolean.
+ * Returns the default value if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the trimmed value of the property, or the default value if it
does not exist
+ */
+ String getTrimmed(String name, String defaultValue);
+
+ /**
+ * Gets the value of the name property as an array of {@link String}s.
+ * Returns the default value if no such value exists.
+ * Interprets the stored value as a comma delimited array.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property as an array, or the default value if it
does not exist
+ */
+ String[] getStrings(String name, String[] defaultValue);
+
+ /**
+ * Gets the value of the name property as a class. Returns the default value
if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property as a class, or the default value if it
does not exist
+ */
+ Class<?> getClass(String name, Class<?> defaultValue);
+
+ /**
+ * Gets the value of the name property as a class implementing the xface
interface.
+ * Returns the default value if no such value exists.
+ *
+ * @param name the property to retrieve the value of
+ * @param defaultValue the default return if no value is set for the property
+ * @return the value of the property as a class, or the default value if it
does not exist
+ */
+ <U> Class<? extends U> getClass(String name, Class<? extends U>
defaultValue, Class<U> xface);
+
+ /**
+ * Load a class by name.
+ *
+ * @param name the name of the {@link Class} to load
+ * @return the loaded class
+ * @throws ClassNotFoundException when the specified class cannot be found
+ */
+ Class<?> getClassByName(String name) throws ClassNotFoundException;
+}
diff --git
a/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java
b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java
new file mode 100644
index 000000000..695ebd9f4
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/util/Reflection.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.util;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Lifted from Hadoop's org.apache.hadoop.util.ReflectionUtils.
+ */
+public class Reflection {
+
+ private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = new
ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+ @SuppressWarnings("unchecked")
+ public static <T> T newInstance(Class<T> theClass) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 8f0d8d893..9c9865010 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -23,29 +23,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.crypto.DecryptionPropertiesFactory;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
-import org.apache.parquet.hadoop.util.HadoopCodecs;
import java.util.Map;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
-import static
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
-import static
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
-
public class HadoopReadOptions extends ParquetReadOptions {
private final Configuration conf;
- private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
-
private HadoopReadOptions(boolean useSignedStringMinMax,
boolean useStatsFilter,
boolean useDictionaryFilter,
@@ -65,7 +53,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
super(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter,
useRecordFilter, useColumnIndexFilter,
usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer,
recordFilter, metadataFilter, codecFactory, allocator,
- maxAllocationSize, properties, fileDecryptionProperties
+ maxAllocationSize, properties, fileDecryptionProperties, new
HadoopParquetConfiguration(conf)
);
this.conf = conf;
}
@@ -100,24 +88,9 @@ public class HadoopReadOptions extends ParquetReadOptions {
}
public Builder(Configuration conf, Path filePath) {
+ super(new HadoopParquetConfiguration(conf));
this.conf = conf;
this.filePath = filePath;
-
useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled",
false));
- useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
- useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
- useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
- useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED,
true));
- usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
- usePageChecksumVerification));
- useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true));
- useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED,
false));
- withCodecFactory(HadoopCodecs.newFactory(conf, 0));
- withRecordFilter(getFilter(conf));
- withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
- String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
- if (badRecordThresh != null) {
- set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
- }
}
@Override
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index dc130ee8d..8e93dc4ad 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -22,6 +22,8 @@ package org.apache.parquet;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -34,9 +36,21 @@ import java.util.Optional;
import java.util.Set;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+import static
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
// Internal use only
public class ParquetReadOptions {
+
+ private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
+
private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
@@ -61,6 +75,7 @@ public class ParquetReadOptions {
private final int maxAllocationSize;
private final Map<String, String> properties;
private final FileDecryptionProperties fileDecryptionProperties;
+ private final ParquetConfiguration conf;
ParquetReadOptions(boolean useSignedStringMinMax,
boolean useStatsFilter,
@@ -77,6 +92,28 @@ public class ParquetReadOptions {
int maxAllocationSize,
Map<String, String> properties,
FileDecryptionProperties fileDecryptionProperties) {
+ this(useSignedStringMinMax, useStatsFilter, useDictionaryFilter,
useRecordFilter, useColumnIndexFilter,
+ usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer,
recordFilter, metadataFilter,
+ codecFactory, allocator, maxAllocationSize, properties,
fileDecryptionProperties,
+ new HadoopParquetConfiguration());
+ }
+
+ ParquetReadOptions(boolean useSignedStringMinMax,
+ boolean useStatsFilter,
+ boolean useDictionaryFilter,
+ boolean useRecordFilter,
+ boolean useColumnIndexFilter,
+ boolean usePageChecksumVerification,
+ boolean useBloomFilter,
+ boolean useOffHeapDecryptBuffer,
+ FilterCompat.Filter recordFilter,
+ ParquetMetadataConverter.MetadataFilter metadataFilter,
+ CompressionCodecFactory codecFactory,
+ ByteBufferAllocator allocator,
+ int maxAllocationSize,
+ Map<String, String> properties,
+ FileDecryptionProperties fileDecryptionProperties,
+ ParquetConfiguration conf) {
this.useSignedStringMinMax = useSignedStringMinMax;
this.useStatsFilter = useStatsFilter;
this.useDictionaryFilter = useDictionaryFilter;
@@ -92,6 +129,7 @@ public class ParquetReadOptions {
this.maxAllocationSize = maxAllocationSize;
this.properties = Collections.unmodifiableMap(properties);
this.fileDecryptionProperties = fileDecryptionProperties;
+ this.conf = conf;
}
public boolean useSignedStringMinMax() {
@@ -164,10 +202,18 @@ public class ParquetReadOptions {
: defaultValue;
}
+ public ParquetConfiguration getConfiguration() {
+ return conf;
+ }
+
public static Builder builder() {
return new Builder();
}
+ public static Builder builder(ParquetConfiguration conf) {
+ return new Builder(conf);
+ }
+
public static class Builder {
protected boolean useSignedStringMinMax = false;
protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
@@ -185,6 +231,31 @@ public class ParquetReadOptions {
protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT;
protected Map<String, String> properties = new HashMap<>();
protected FileDecryptionProperties fileDecryptionProperties = null;
+ protected ParquetConfiguration conf;
+
+ public Builder() {
+ this(new HadoopParquetConfiguration());
+ }
+
+ public Builder(ParquetConfiguration conf) {
+ this.conf = conf;
+
useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled",
false));
+ useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
+ useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
+ useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
+ useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED,
true));
+ usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
+ usePageChecksumVerification));
+ useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true));
+ useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED,
false));
+ withCodecFactory(HadoopCodecs.newFactory(conf, 0));
+ withRecordFilter(getFilter(conf));
+ withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
+ String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
+ if (badRecordThresh != null) {
+ set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
+ }
+ }
public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
this.useSignedStringMinMax = useSignedStringMinMax;
@@ -325,6 +396,7 @@ public class ParquetReadOptions {
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
withDecryption(options.fileDecryptionProperties);
+ conf = options.conf;
for (Map.Entry<String, String> keyValue : options.properties.entrySet())
{
set(keyValue.getKey(), keyValue.getValue());
}
@@ -333,13 +405,17 @@ public class ParquetReadOptions {
public ParquetReadOptions build() {
if (codecFactory == null) {
- codecFactory = HadoopCodecs.newFactory(0);
+ if (conf == null) {
+ codecFactory = HadoopCodecs.newFactory(0);
+ } else {
+ codecFactory = HadoopCodecs.newFactory(conf, 0);
+ }
}
return new ParquetReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter,
useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter,
useOffHeapDecryptBuffer, recordFilter, metadataFilter,
- codecFactory, allocator, maxAllocationSize, properties,
fileDecryptionProperties);
+ codecFactory, allocator, maxAllocationSize, properties,
fileDecryptionProperties, conf);
}
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java
b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java
new file mode 100644
index 000000000..26fce1e9b
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/conf/HadoopParquetConfiguration.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Implementation of the Parquet configuration interface relying on Hadoop's
+ * Configuration to aid with interoperability and backwards compatibility.
+ */
+public class HadoopParquetConfiguration implements ParquetConfiguration {
+
+ private final Configuration configuration;
+
+ public HadoopParquetConfiguration() {
+ this(true);
+ }
+
+ public HadoopParquetConfiguration(boolean loadDefaults) {
+ configuration = new Configuration(loadDefaults);
+ }
+
+ public HadoopParquetConfiguration(Configuration conf) {
+ configuration = conf;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public void set(String name, String value) {
+ configuration.set(name, value);
+ }
+
+ @Override
+ public void setLong(String name, long value) {
+ configuration.setLong(name, value);
+ }
+
+ @Override
+ public void setInt(String name, int value) {
+ configuration.setInt(name, value);
+ }
+
+ @Override
+ public void setBoolean(String name, boolean value) {
+ configuration.setBoolean(name, value);
+ }
+
+ @Override
+ public void setStrings(String name, String... values) {
+ configuration.setStrings(name, values);
+ }
+
+ @Override
+ public void setClass(String name, Class<?> value, Class<?> xface) {
+ configuration.setClass(name, value, xface);
+ }
+
+ @Override
+ public String get(String name) {
+ return configuration.get(name);
+ }
+
+ @Override
+ public String get(String name, String defaultValue) {
+ return configuration.get(name, defaultValue);
+ }
+
+ @Override
+ public long getLong(String name, long defaultValue) {
+ return configuration.getLong(name, defaultValue);
+ }
+
+ @Override
+ public int getInt(String name, int defaultValue) {
+ return configuration.getInt(name, defaultValue);
+ }
+
+ @Override
+ public boolean getBoolean(String name, boolean defaultValue) {
+ return configuration.getBoolean(name, defaultValue);
+ }
+
+ @Override
+ public String getTrimmed(String name) {
+ return configuration.getTrimmed(name);
+ }
+
+ @Override
+ public String getTrimmed(String name, String defaultValue) {
+ return configuration.getTrimmed(name, defaultValue);
+ }
+
+ @Override
+ public String[] getStrings(String name, String[] defaultValue) {
+ return configuration.getStrings(name, defaultValue);
+ }
+
+ @Override
+ public Class<?> getClass(String name, Class<?> defaultValue) {
+ return configuration.getClass(name, defaultValue);
+ }
+
+ @Override
+ public <U> Class<? extends U> getClass(String name, Class<? extends U>
defaultValue, Class<U> xface) {
+ return configuration.getClass(name, defaultValue, xface);
+ }
+
+ @Override
+ public Class<?> getClassByName(String name) throws ClassNotFoundException {
+ return configuration.getClassByName(name);
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, String>> iterator() {
+ return configuration.iterator();
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index d93b4071c..a30c57aeb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -37,8 +37,11 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
public class CodecFactory implements CompressionCodecFactory {
@@ -48,7 +51,7 @@ public class CodecFactory implements CompressionCodecFactory {
private final Map<CompressionCodecName, BytesCompressor> compressors = new
HashMap<>();
private final Map<CompressionCodecName, BytesDecompressor> decompressors =
new HashMap<>();
- protected final Configuration configuration;
+ protected final ParquetConfiguration configuration;
protected final int pageSize;
/**
@@ -61,6 +64,19 @@ public class CodecFactory implements CompressionCodecFactory
{
* decompressors this parameter has no impact on the
function of the factory
*/
public CodecFactory(Configuration configuration, int pageSize) {
+ this(new HadoopParquetConfiguration(configuration), pageSize);
+ }
+
+ /**
+ * Create a new codec factory.
+ *
+ * @param configuration used to pass compression codec configuration
information
+ * @param pageSize the expected page size, does not set a hard limit,
currently just
+ * used to set the initial size of the output stream used
when
+ * compressing a buffer. If this factory is only used to
construct
+ * decompressors this parameter has no impact on the
function of the factory
+ */
+ public CodecFactory(ParquetConfiguration configuration, int pageSize) {
this.configuration = configuration;
this.pageSize = pageSize;
}
@@ -246,9 +262,9 @@ public class CodecFactory implements
CompressionCodecFactory {
codecClass = Class.forName(codecClassName);
} catch (ClassNotFoundException e) {
// Try to load the class using the job classloader
- codecClass = configuration.getClassLoader().loadClass(codecClassName);
+ codecClass = new
Configuration(false).getClassLoader().loadClass(codecClassName);
}
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
configuration);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
ConfigurationUtil.createHadoopConfiguration(configuration));
CODEC_BY_NAME.put(codecCacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
index 5454a69b2..588f93c89 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java
@@ -24,6 +24,8 @@ import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.codec.ZstdDecompressorStream;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -50,6 +52,10 @@ import static
org.apache.parquet.hadoop.codec.ZstandardCodec.PARQUET_COMPRESS_ZS
class DirectZstd {
static CodecFactory.BytesCompressor createCompressor(Configuration conf, int
pageSize) {
+ return createCompressor(new HadoopParquetConfiguration(conf), pageSize);
+ }
+
+ static CodecFactory.BytesCompressor createCompressor(ParquetConfiguration
conf, int pageSize) {
return new ZstdCompressor(
getPool(conf),
conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL,
DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
@@ -58,6 +64,10 @@ class DirectZstd {
}
static CodecFactory.BytesDecompressor createDecompressor(Configuration conf)
{
+ return createDecompressor(new HadoopParquetConfiguration(conf));
+ }
+
+ static CodecFactory.BytesDecompressor
createDecompressor(ParquetConfiguration conf) {
return new ZstdDecompressor(getPool(conf));
}
@@ -133,7 +143,7 @@ class DirectZstd {
}
}
- private static BufferPool getPool(Configuration conf) {
+ private static BufferPool getPool(ParquetConfiguration conf) {
if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED,
DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) {
return RecyclingBufferPool.INSTANCE;
} else {
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 8203e9098..36da819fa 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Set;
@@ -29,9 +30,9 @@ import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -167,10 +168,7 @@ class InternalParquetRecordReader<T> {
public void initialize(ParquetFileReader reader, ParquetReadOptions options)
{
// copy custom configuration to the Configuration passed to the ReadSupport
- Configuration conf = new Configuration();
- if (options instanceof HadoopReadOptions) {
- conf = ((HadoopReadOptions) options).getConf();
- }
+ ParquetConfiguration conf =
Objects.requireNonNull(options).getConfiguration();
for (String property : options.getPropertyNames()) {
conf.set(property, options.getProperty(property));
}
@@ -261,7 +259,7 @@ class InternalParquetRecordReader<T> {
LOG.debug("read value: {}", currentValue);
} catch (RuntimeException e) {
- throw new ParquetDecodingException(format("Can not read value at %d in
block %d in file %s", current, currentBlock, reader.getPath()), e);
+ throw new ParquetDecodingException(format("Can not read value at %d in
block %d in file %s", current, currentBlock, reader.getFile()), e);
}
}
return true;
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 1bfd4b20f..7d355af78 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -193,18 +195,13 @@ public class ParquetInputFormat<T> extends
FileInputFormat<Void, T> {
return ConfigurationUtil.getClassFromConfig(configuration,
UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
}
- private static UnboundRecordFilter
getUnboundRecordFilterInstance(Configuration configuration) {
+ private static UnboundRecordFilter
getUnboundRecordFilterInstance(ParquetConfiguration configuration) {
Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration,
UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
- if (clazz == null) { return null; }
-
+ if (clazz == null) {
+ return null;
+ }
try {
- UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter)
clazz.newInstance();
-
- if (unboundRecordFilter instanceof Configurable) {
- ((Configurable)unboundRecordFilter).setConf(configuration);
- }
-
- return unboundRecordFilter;
+ return (UnboundRecordFilter) clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new BadConfigurationException(
"could not instantiate unbound record filter class", e);
@@ -232,6 +229,10 @@ public class ParquetInputFormat<T> extends
FileInputFormat<Void, T> {
}
private static FilterPredicate getFilterPredicate(Configuration
configuration) {
+ return getFilterPredicate(new HadoopParquetConfiguration(configuration));
+ }
+
+ private static FilterPredicate getFilterPredicate(ParquetConfiguration
configuration) {
try {
return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE,
configuration);
} catch (IOException e) {
@@ -247,6 +248,10 @@ public class ParquetInputFormat<T> extends
FileInputFormat<Void, T> {
* @return a filter for the unbound record filter specified in conf
*/
public static Filter getFilter(Configuration conf) {
+ return getFilter(new HadoopParquetConfiguration(conf));
+ }
+
+ public static Filter getFilter(ParquetConfiguration conf) {
return FilterCompat.get(getFilterPredicate(conf),
getUnboundRecordFilterInstance(conf));
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index f9c8314dd..785e5d05d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -36,11 +36,14 @@ import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
@@ -180,6 +183,10 @@ public class ParquetReader<T> implements Closeable {
return new Builder<>(file);
}
+ public static <T> Builder<T> read(InputFile file, ParquetConfiguration conf)
throws IOException {
+ return new Builder<>(file, conf);
+ }
+
public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
return new Builder<>(readSupport, path);
}
@@ -190,7 +197,7 @@ public class ParquetReader<T> implements Closeable {
private final Path path;
private Filter filter = null;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
- protected Configuration conf;
+ protected ParquetConfiguration conf;
private ParquetReadOptions.Builder optionsBuilder;
@Deprecated
@@ -198,8 +205,9 @@ public class ParquetReader<T> implements Closeable {
this.readSupport = Objects.requireNonNull(readSupport, "readSupport
cannot be null");
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
- this.conf = new Configuration();
- this.optionsBuilder = HadoopReadOptions.builder(conf, path);
+ Configuration hadoopConf = new Configuration();
+ this.conf = new HadoopParquetConfiguration(hadoopConf);
+ this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
}
@Deprecated
@@ -207,8 +215,9 @@ public class ParquetReader<T> implements Closeable {
this.readSupport = null;
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
- this.conf = new Configuration();
- this.optionsBuilder = HadoopReadOptions.builder(conf, path);
+ Configuration hadoopConf = new Configuration();
+ this.conf = new HadoopParquetConfiguration(hadoopConf);
+ this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
}
protected Builder(InputFile file) {
@@ -217,17 +226,30 @@ public class ParquetReader<T> implements Closeable {
this.path = null;
if (file instanceof HadoopInputFile) {
HadoopInputFile hadoopFile = (HadoopInputFile) file;
- this.conf = hadoopFile.getConfiguration();
- optionsBuilder = HadoopReadOptions.builder(conf, hadoopFile.getPath());
+ Configuration hadoopConf = hadoopFile.getConfiguration();
+ this.conf = new HadoopParquetConfiguration(hadoopConf);
+ optionsBuilder = HadoopReadOptions.builder(hadoopConf,
hadoopFile.getPath());
} else {
- this.conf = new Configuration();
- optionsBuilder = HadoopReadOptions.builder(conf);
+ optionsBuilder = ParquetReadOptions.builder(new
HadoopParquetConfiguration());
+ }
+ }
+
+ protected Builder(InputFile file, ParquetConfiguration conf) {
+ this.readSupport = null;
+ this.file = Objects.requireNonNull(file, "file cannot be null");
+ this.path = null;
+ this.conf = conf;
+ if (file instanceof HadoopInputFile) {
+ HadoopInputFile hadoopFile = (HadoopInputFile) file;
+ optionsBuilder =
HadoopReadOptions.builder(ConfigurationUtil.createHadoopConfiguration(conf),
hadoopFile.getPath());
+ } else {
+ optionsBuilder = ParquetReadOptions.builder(conf);
}
}
// when called, resets options to the defaults from conf
public Builder<T> withConf(Configuration conf) {
- this.conf = Objects.requireNonNull(conf, "conf cannot be null");
+ this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf,
"conf cannot be null"));
// previous versions didn't use the builder, so may set filter before
conf. this maintains
// compatibility for filter. other options are reset by a new conf.
@@ -239,6 +261,15 @@ public class ParquetReader<T> implements Closeable {
return this;
}
+ public Builder<T> withConf(ParquetConfiguration conf) {
+ this.conf = conf;
+ this.optionsBuilder = ParquetReadOptions.builder(conf);
+ if (filter != null) {
+ optionsBuilder.withRecordFilter(filter);
+ }
+ return this;
+ }
+
public Builder<T> withFilter(Filter filter) {
this.filter = filter;
optionsBuilder.withRecordFilter(filter);
@@ -354,19 +385,20 @@ public class ParquetReader<T> implements Closeable {
.build();
if (path != null) {
- FileSystem fs = path.getFileSystem(conf);
+ Configuration hadoopConf =
ConfigurationUtil.createHadoopConfiguration(conf);
+ FileSystem fs = path.getFileSystem(hadoopConf);
FileStatus stat = fs.getFileStatus(path);
if (stat.isFile()) {
return new ParquetReader<>(
- Collections.singletonList((InputFile)
HadoopInputFile.fromStatus(stat, conf)),
+ Collections.singletonList((InputFile)
HadoopInputFile.fromStatus(stat, hadoopConf)),
options,
getReadSupport());
} else {
List<InputFile> files = new ArrayList<>();
for (FileStatus fileStatus : fs.listStatus(path,
HiddenFileFilter.INSTANCE)) {
- files.add(HadoopInputFile.fromStatus(fileStatus, conf));
+ files.add(HadoopInputFile.fromStatus(fileStatus, hadoopConf));
}
return new ParquetReader<T>(files, options, getReadSupport());
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 7b78a9376..2e888722e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -26,11 +26,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
@@ -276,6 +279,30 @@ public class ParquetWriter<T> implements Closeable {
int maxPaddingSize,
ParquetProperties encodingProps,
FileEncryptionProperties encryptionProperties) throws IOException {
+ this(
+ file,
+ mode,
+ writeSupport,
+ compressionCodecName,
+ rowGroupSize,
+ validating,
+ new HadoopParquetConfiguration(conf),
+ maxPaddingSize,
+ encodingProps,
+ encryptionProperties);
+ }
+
+ ParquetWriter(
+ OutputFile file,
+ ParquetFileWriter.Mode mode,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ long rowGroupSize,
+ boolean validating,
+ ParquetConfiguration conf,
+ int maxPaddingSize,
+ ParquetProperties encodingProps,
+ FileEncryptionProperties encryptionProperties) throws IOException {
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
MessageType schema = writeContext.getSchema();
@@ -283,8 +310,9 @@ public class ParquetWriter<T> implements Closeable {
// encryptionProperties could be built from the implementation of
EncryptionPropertiesFactory when it is attached.
if (encryptionProperties == null) {
String path = file == null ? null : file.getPath();
- encryptionProperties =
ParquetOutputFormat.createEncryptionProperties(conf,
- path == null ? null : new Path(path), writeContext);
+ Configuration hadoopConf =
ConfigurationUtil.createHadoopConfiguration(conf);
+ encryptionProperties = ParquetOutputFormat.createEncryptionProperties(
+ hadoopConf, path == null ? null : new Path(path), writeContext);
}
ParquetFileWriter fileWriter = new ParquetFileWriter(
@@ -353,7 +381,7 @@ public class ParquetWriter<T> implements Closeable {
private OutputFile file = null;
private Path path = null;
private FileEncryptionProperties encryptionProperties = null;
- private Configuration conf = new Configuration();
+ private ParquetConfiguration conf = null;
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
@@ -381,6 +409,14 @@ public class ParquetWriter<T> implements Closeable {
*/
protected abstract WriteSupport<T> getWriteSupport(Configuration conf);
+ /**
+ * @param conf a configuration
+ * @return an appropriate WriteSupport for the object model.
+ */
+ protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+ throw new UnsupportedOperationException("Override
ParquetWriter$Builder#getWriteSupport(ParquetConfiguration)");
+ }
+
/**
* Set the {@link Configuration} used by the constructed writer.
*
@@ -388,6 +424,17 @@ public class ParquetWriter<T> implements Closeable {
* @return this builder for method chaining.
*/
public SELF withConf(Configuration conf) {
+ this.conf = new HadoopParquetConfiguration(conf);
+ return self();
+ }
+
+ /**
+ * Set the {@link ParquetConfiguration} used by the constructed writer.
+ *
+ * @param conf a {@code ParquetConfiguration}
+ * @return this builder for method chaining.
+ */
+ public SELF withConf(ParquetConfiguration conf) {
this.conf = conf;
return self();
}
@@ -719,6 +766,9 @@ public class ParquetWriter<T> implements Closeable {
* @return this builder for method chaining.
*/
public SELF config(String property, String value) {
+ if (conf == null) {
+ conf = new HadoopParquetConfiguration();
+ }
conf.set(property, value);
return self();
}
@@ -730,12 +780,15 @@ public class ParquetWriter<T> implements Closeable {
* @throws IOException if there is an error while creating the writer
*/
public ParquetWriter<T> build() throws IOException {
+ if (conf == null) {
+ conf = new HadoopParquetConfiguration();
+ }
if (file != null) {
return new ParquetWriter<>(file,
mode, getWriteSupport(conf), codecName, rowGroupSize,
enableValidation, conf,
maxPaddingSize, encodingPropsBuilder.build(),
encryptionProperties);
} else {
- return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
+ return new ParquetWriter<>(HadoopOutputFile.fromPath(path,
ConfigurationUtil.createHadoopConfiguration(conf)),
mode, getWriteSupport(conf), codecName,
rowGroupSize, enableValidation, conf, maxPaddingSize,
encodingPropsBuilder.build(), encryptionProperties);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
index 8100a351f..2593393d7 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
@@ -53,6 +54,15 @@ public class DelegatingReadSupport<T> extends ReadSupport<T>
{
return delegate.prepareForRead(configuration, keyValueMetaData,
fileSchema, readContext);
}
+ @Override
+ public RecordMaterializer<T> prepareForRead(
+ ParquetConfiguration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadSupport.ReadContext readContext) {
+ return delegate.prepareForRead(configuration, keyValueMetaData,
fileSchema, readContext);
+ }
+
@Override
public String toString() {
return this.getClass().getName() + "(" + delegate.toString() + ")";
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
index f5bbfc60d..926fe68c6 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
@@ -20,6 +20,7 @@ package org.apache.parquet.hadoop.api;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.api.RecordConsumer;
/**
@@ -42,6 +43,11 @@ public class DelegatingWriteSupport<T> extends
WriteSupport<T> {
return delegate.init(configuration);
}
+ @Override
+ public WriteSupport.WriteContext init(ParquetConfiguration configuration) {
+ return delegate.init(configuration);
+ }
+
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
delegate.prepareForWrite(recordConsumer);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
index 6bc5e5d3d..3d80811aa 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
@@ -25,6 +25,9 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
/**
@@ -35,7 +38,7 @@ public class InitContext {
private final Map<String,Set<String>> keyValueMetadata;
private Map<String,String> mergedKeyValueMetadata;
- private final Configuration configuration;
+ private final ParquetConfiguration configuration;
private final MessageType fileSchema;
/**
@@ -47,6 +50,13 @@ public class InitContext {
Configuration configuration,
Map<String, Set<String>> keyValueMetadata,
MessageType fileSchema) {
+ this(new HadoopParquetConfiguration(configuration), keyValueMetadata,
fileSchema);
+ }
+
+ public InitContext(
+ ParquetConfiguration configuration,
+ Map<String, Set<String>> keyValueMetadata,
+ MessageType fileSchema) {
super();
this.keyValueMetadata = keyValueMetadata;
this.configuration = configuration;
@@ -77,6 +87,13 @@ public class InitContext {
* @return the configuration for this job
*/
public Configuration getConfiguration() {
+ return ConfigurationUtil.createHadoopConfiguration(configuration);
+ }
+
+ /**
+ * @return the Parquet configuration for this job
+ */
+ public ParquetConfiguration getParquetConfiguration() {
return configuration;
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
index 62344522b..a3dfe2caa 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
@@ -69,10 +70,28 @@ abstract public class ReadSupport<T> {
*/
@Deprecated
public ReadContext init(
- Configuration configuration,
- Map<String, String> keyValueMetaData,
- MessageType fileSchema) {
- throw new UnsupportedOperationException("Override init(InitContext)");
+ Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
+ throw new UnsupportedOperationException("Override
ReadSupport.init(InitContext)");
+ }
+
+ /**
+ * called in {@link
org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
in the front end
+ *
+ * @param configuration the configuration
+ * @param keyValueMetaData the app specific metadata from the file
+ * @param fileSchema the schema of the file
+ * @return the readContext that defines how to read the file
+ *
+ * @deprecated override {@link ReadSupport#init(InitContext)} instead
+ */
+ @Deprecated
+ public ReadContext init(
+ ParquetConfiguration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
+ throw new UnsupportedOperationException("Override
ReadSupport.init(InitContext)");
}
/**
@@ -82,7 +101,7 @@ abstract public class ReadSupport<T> {
* @return the readContext that defines how to read the file
*/
public ReadContext init(InitContext context) {
- return init(context.getConfiguration(),
context.getMergedKeyValueMetaData(), context.getFileSchema());
+ return init(context.getParquetConfiguration(),
context.getMergedKeyValueMetaData(), context.getFileSchema());
}
/**
@@ -101,6 +120,24 @@ abstract public class ReadSupport<T> {
MessageType fileSchema,
ReadContext readContext);
+ /**
+ * called in {@link
org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext)} in the back end
+ * the returned RecordMaterializer will materialize the records and add them
to the destination
+ *
+ * @param configuration the configuration
+ * @param keyValueMetaData the app specific metadata from the file
+ * @param fileSchema the schema of the file
+ * @param readContext returned by the init method
+ * @return the recordMaterializer that will materialize the records
+ */
+ public RecordMaterializer<T> prepareForRead(
+ ParquetConfiguration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
+ throw new UnsupportedOperationException("Override
ReadSupport.prepareForRead(ParquetConfiguration, Map<String, String>,
MessageType, ReadContext)");
+ }
+
/**
* information to read the file
*/
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
index 9549d5f49..b73e102c2 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
@@ -105,6 +106,15 @@ abstract public class WriteSupport<T> {
*/
public abstract WriteContext init(Configuration configuration);
+ /**
+ * called first in the task
+ * @param configuration the job's configuration
+ * @return the information needed to write the file
+ */
+ public WriteContext init(ParquetConfiguration configuration) {
+ throw new UnsupportedOperationException("Override
WriteSupport#init(ParquetConfiguration)");
+ }
+
/**
* This will be called once per row group
* @param recordConsumer the recordConsumer to write to
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
index 12a67d301..a151b4fce 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.hadoop.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
@@ -111,6 +112,11 @@ public class ExampleParquetWriter extends
ParquetWriter<Group> {
@Override
protected WriteSupport<Group> getWriteSupport(Configuration conf) {
+ return getWriteSupport((ParquetConfiguration) null);
+ }
+
+ @Override
+ protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
return new GroupWriteSupport(type, extraMetaData);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
index c49b681d5..6cb4b6f7f 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
@@ -22,6 +22,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.api.ReadSupport;
@@ -34,6 +36,13 @@ public class GroupReadSupport extends ReadSupport<Group> {
public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(
Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema) {
+ return init(new HadoopParquetConfiguration(configuration),
keyValueMetaData, fileSchema);
+ }
+
+ @Override
+ public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(
+ ParquetConfiguration configuration, Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
String partialSchemaString =
configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
MessageType requestedProjection = getSchemaForRead(fileSchema,
partialSchemaString);
return new ReadContext(requestedProjection);
@@ -46,4 +55,11 @@ public class GroupReadSupport extends ReadSupport<Group> {
return new GroupRecordConverter(readContext.getRequestedSchema());
}
+ @Override
+ public RecordMaterializer<Group> prepareForRead(ParquetConfiguration
configuration,
+ Map<String, String>
keyValueMetaData, MessageType fileSchema,
+
org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+ return new GroupRecordConverter(readContext.getRequestedSchema());
+ }
+
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
index dfed676c9..a4d4a3f36 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
@@ -26,6 +26,8 @@ import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
@@ -41,6 +43,10 @@ public class GroupWriteSupport extends WriteSupport<Group> {
}
public static MessageType getSchema(Configuration configuration) {
+ return getSchema(new HadoopParquetConfiguration(configuration));
+ }
+
+ public static MessageType getSchema(ParquetConfiguration configuration) {
return
parseMessageType(Objects.requireNonNull(configuration.get(PARQUET_EXAMPLE_SCHEMA),
PARQUET_EXAMPLE_SCHEMA));
}
@@ -68,6 +74,11 @@ public class GroupWriteSupport extends WriteSupport<Group> {
@Override
public org.apache.parquet.hadoop.api.WriteSupport.WriteContext
init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public org.apache.parquet.hadoop.api.WriteSupport.WriteContext
init(ParquetConfiguration configuration) {
// if present, prefer the schema passed to the constructor
if (schema == null) {
schema = getSchema(configuration);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
index 7f39cd76c..ca524d90b 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
@@ -19,18 +19,26 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.BadConfigurationException;
+import java.util.Map;
+
public class ConfigurationUtil {
public static Class<?> getClassFromConfig(Configuration configuration,
String configName, Class<?> assignableFrom) {
+ return getClassFromConfig(new HadoopParquetConfiguration(configuration),
configName, assignableFrom);
+ }
+
+ public static Class<?> getClassFromConfig(ParquetConfiguration
configuration, String configName, Class<?> assignableFrom) {
final String className = configuration.get(configName);
if (className == null) {
return null;
}
-
+
try {
- final Class<?> foundClass = configuration.getClassByName(className);
+ final Class<?> foundClass = configuration.getClassByName(className);
if (!assignableFrom.isAssignableFrom(foundClass)) {
throw new BadConfigurationException("class " + className + " set in
job conf at "
+ configName + " is not a subclass of " +
assignableFrom.getCanonicalName());
@@ -41,4 +49,18 @@ public class ConfigurationUtil {
}
}
+ public static Configuration createHadoopConfiguration(ParquetConfiguration
conf) {
+ if (conf == null) {
+ return new Configuration();
+ }
+ if (conf instanceof HadoopParquetConfiguration) {
+ return ((HadoopParquetConfiguration) conf).getConfiguration();
+ }
+ Configuration configuration = new Configuration();
+ for (Map.Entry<String, String> entry : conf) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ return configuration;
+ }
+
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
index a46c8db21..845cafc5a 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
@@ -22,6 +22,7 @@ package org.apache.parquet.hadoop.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.CodecFactory;
public class HadoopCodecs {
@@ -33,6 +34,10 @@ public class HadoopCodecs {
return new CodecFactory(conf, sizeHint);
}
+ public static CompressionCodecFactory newFactory(ParquetConfiguration conf,
int sizeHint) {
+ return new CodecFactory(conf, sizeHint);
+ }
+
public static CompressionCodecFactory newDirectFactory(Configuration conf,
ByteBufferAllocator allocator, int sizeHint) {
return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
index 199b774c4..6e669ed8b 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
@@ -29,6 +29,8 @@ import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
/**
* Serialization utils copied from:
@@ -70,8 +72,22 @@ public final class SerializationUtil {
* @return the read object, or null if key is not present in conf
* @throws IOException if there is an error while reading
*/
- @SuppressWarnings("unchecked")
public static <T> T readObjectFromConfAsBase64(String key, Configuration
conf) throws IOException {
+ return readObjectFromConfAsBase64(key, new
HadoopParquetConfiguration(conf));
+ }
+
+ /**
+ * Reads an object (that was written using
+ * {@link #writeObjectToConfAsBase64}) from a configuration
+ *
+ * @param key for the configuration
+ * @param conf to read from
+ * @param <T> the Java type of the deserialized object
+ * @return the read object, or null if key is not present in conf
+ * @throws IOException if there is an error while reading
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T readObjectFromConfAsBase64(String key,
ParquetConfiguration conf) throws IOException {
String b64 = conf.get(key);
if (b64 == null) {
return null;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
index 074d2e8b6..e8e032c9c 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/DirectWriterTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -86,6 +87,11 @@ public class DirectWriterTest {
@Override
public WriteContext init(Configuration configuration) {
+ return init((ParquetConfiguration) null);
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
return new WriteContext(type, metadata);
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
index 862ae672c..a7c200283 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java
@@ -22,6 +22,8 @@ package org.apache.parquet.crypto.propertiesfactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.EncryptionPropertiesFactory;
import org.apache.parquet.crypto.ParquetCipher;
import org.apache.parquet.example.data.Group;
@@ -203,6 +205,11 @@ public class SchemaControlEncryptionTest {
@Override
public WriteContext init(Configuration conf) {
+ return init(new HadoopParquetConfiguration(conf));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration conf) {
WriteContext writeContext = super.init(conf);
MessageType schema = writeContext.getSchema();
List<ColumnDescriptor> columns = schema.getColumns();
@@ -219,6 +226,10 @@ public class SchemaControlEncryptionTest {
}
private void setMetadata(ColumnDescriptor column, Configuration conf) {
+ setMetadata(column, new HadoopParquetConfiguration(conf));
+ }
+
+ private void setMetadata(ColumnDescriptor column, ParquetConfiguration
conf) {
String columnShortName = column.getPath()[column.getPath().length - 1];
if (cryptoMetadata.containsKey(columnShortName) &&
cryptoMetadata.get(columnShortName).get("columnKeyMetaData") != null) {
@@ -242,6 +253,11 @@ public class SchemaControlEncryptionTest {
@Override
protected WriteSupport<Group> getWriteSupport(Configuration conf) {
+ return getWriteSupport((ParquetConfiguration) null);
+ }
+
+ @Override
+ protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
return new CryptoGroupWriteSupport();
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
index e3ddb3e4c..d64e8d975 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
@@ -330,7 +330,7 @@ public class TestEncryptionOptions {
Path rootPath = new Path(temporaryFolder.getRoot().getPath());
LOG.info("======== testWriteReadEncryptedParquetFiles {} ========",
rootPath.toString());
byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
- // Write using various encryption configuraions
+ // Write using various encryption configurations
testWriteEncryptedParquetFiles(rootPath, DATA);
// Read using various decryption configurations.
testReadEncryptedParquetFiles(rootPath, DATA);
diff --git
a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
index 50f9ebcc3..cca1a91f2 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.pig.LoadPushDown.RequiredFieldList;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -61,6 +63,14 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
* @return the pig schema requested by the user or null if none.
*/
static Schema getPigSchema(Configuration configuration) {
+ return getPigSchema(new HadoopParquetConfiguration(configuration));
+ }
+
+ /**
+ * @param configuration the configuration
+ * @return the pig schema requested by the user or null if none.
+ */
+ static Schema getPigSchema(ParquetConfiguration configuration) {
return parsePigSchema(configuration.get(PARQUET_PIG_SCHEMA));
}
@@ -69,9 +79,17 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
* @return List of required fields from pushProjection
*/
static RequiredFieldList getRequiredFields(Configuration configuration) {
+ return getRequiredFields(new HadoopParquetConfiguration(configuration));
+ }
+
+ /**
+ * @param configuration configuration
+ * @return List of required fields from pushProjection
+ */
+ static RequiredFieldList getRequiredFields(ParquetConfiguration
configuration) {
String requiredFieldString =
configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
- if(requiredFieldString == null) {
+ if (requiredFieldString == null) {
return null;
}
@@ -154,9 +172,9 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
@Override
public ReadContext init(InitContext initContext) {
- Schema pigSchema = getPigSchema(initContext.getConfiguration());
- RequiredFieldList requiredFields =
getRequiredFields(initContext.getConfiguration());
- boolean columnIndexAccess =
initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+ Schema pigSchema = getPigSchema(initContext.getParquetConfiguration());
+ RequiredFieldList requiredFields =
getRequiredFields(initContext.getParquetConfiguration());
+ boolean columnIndexAccess =
initContext.getParquetConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS,
false);
if (pigSchema == null) {
return new ReadContext(initContext.getFileSchema());
@@ -174,9 +192,17 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
Map<String, String> keyValueMetaData,
MessageType fileSchema,
ReadContext readContext) {
+ return prepareForRead(new HadoopParquetConfiguration(configuration),
keyValueMetaData, fileSchema, readContext);
+ }
+
+ @Override
+ public RecordMaterializer<Tuple> prepareForRead(
+ ParquetConfiguration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
MessageType requestedSchema = readContext.getRequestedSchema();
Schema requestedPigSchema = getPigSchema(configuration);
-
if (requestedPigSchema == null) {
throw new ParquetDecodingException("Missing Pig schema: ParquetLoader
sets the schema in the job conf");
}
diff --git
a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
index 68a7d7d22..fd1bb39cd 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
@@ -26,6 +26,8 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -82,6 +84,11 @@ public class TupleWriteSupport extends WriteSupport<Tuple> {
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
Map<String, String> extraMetaData = new HashMap<String, String>();
new PigMetaData(rootPigSchema).addToMetaData(extraMetaData);
return new WriteContext(rootSchema, extraMetaData);
diff --git
a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
index ff4bd87d6..1c21044cb 100644
---
a/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
+++
b/parquet-pig/src/test/java/org/apache/parquet/pig/TestTupleRecordConsumer.java
@@ -33,6 +33,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -175,7 +176,7 @@ public class TestTupleRecordConsumer {
private <T> TupleWriteSupport newTupleWriter(String pigSchemaString,
RecordMaterializer<T> recordConsumer) throws ParserException {
TupleWriteSupport tupleWriter =
TupleWriteSupport.fromPigSchema(pigSchemaString);
- tupleWriter.init(null);
+ tupleWriter.init((ParquetConfiguration) null);
tupleWriter.prepareForWrite(
new ConverterConsumer(recordConsumer.getRootConverter(),
tupleWriter.getParquetSchema())
);
diff --git
a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
index c8e36aded..12c3373f5 100644
---
a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
+++
b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.logging.Level;
import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.NonSpillableDataBag;
@@ -130,8 +131,8 @@ public class TupleConsumerPerfTest {
TupleReadSupport tupleReadSupport = new TupleReadSupport();
Map<String, String> pigMetaData = pigMetaData(pigSchemaString);
MessageType schema = new
PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchemaString));
- ReadContext init = tupleReadSupport.init(null, pigMetaData, schema);
- RecordMaterializer<Tuple> recordConsumer =
tupleReadSupport.prepareForRead(null, pigMetaData, schema, init);
+ ReadContext init = tupleReadSupport.init((ParquetConfiguration) null,
pigMetaData, schema);
+ RecordMaterializer<Tuple> recordConsumer =
tupleReadSupport.prepareForRead((ParquetConfiguration) null, pigMetaData,
schema, init);
RecordReader<Tuple> recordReader = columnIO.getRecordReader(columns,
recordConsumer);
// TODO: put this back
// if (DEBUG) {
@@ -156,7 +157,7 @@ public class TupleConsumerPerfTest {
private static void write(MemPageStore memPageStore, ColumnWriteStoreV1
columns, MessageType schema, String pigSchemaString) throws ExecException,
ParserException {
MessageColumnIO columnIO = newColumnFactory(pigSchemaString);
TupleWriteSupport tupleWriter =
TupleWriteSupport.fromPigSchema(pigSchemaString);
- tupleWriter.init(null);
+ tupleWriter.init((ParquetConfiguration) null);
tupleWriter.prepareForWrite(columnIO.getRecordWriter(columns));
write(memPageStore, tupleWriter, 10000);
write(memPageStore, tupleWriter, 10000);
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 8383fbc75..da51788f2 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -25,6 +25,8 @@ import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
@@ -68,7 +70,7 @@ class ProtoMessageConverter extends GroupConverter {
}
};
- protected final Configuration conf;
+ protected final ParquetConfiguration conf;
protected final Converter[] converters;
protected final ParentValueContainer parent;
protected final Message.Builder myBuilder;
@@ -88,8 +90,16 @@ class ProtoMessageConverter extends GroupConverter {
this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema,
extraMetadata);
}
+ ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc,
Class<? extends Message> protoClass, GroupType parquetSchema, Map<String,
String> extraMetadata) {
+ this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema,
extraMetadata);
+ }
+
// For usage in message arrays
ProtoMessageConverter(Configuration conf, ParentValueContainer pvc,
Message.Builder builder, GroupType parquetSchema, Map<String, String>
extraMetadata) {
+ this(new HadoopParquetConfiguration(conf), pvc, builder, parquetSchema,
extraMetadata);
+ }
+
+ ProtoMessageConverter(ParquetConfiguration conf, ParentValueContainer pvc,
Message.Builder builder, GroupType parquetSchema, Map<String, String>
extraMetadata) {
if (pvc == null) {
throw new IllegalStateException("Missing parent value container");
}
@@ -141,7 +151,12 @@ class ProtoMessageConverter extends GroupConverter {
private Converter dummyScalarConverter(ParentValueContainer pvc,
Type parquetField, Configuration conf,
Map<String, String> extraMetadata) {
+ return dummyScalarConverter(pvc, parquetField, new
HadoopParquetConfiguration(conf), extraMetadata);
+ }
+ private Converter dummyScalarConverter(ParentValueContainer pvc,
+ Type parquetField,
ParquetConfiguration conf,
+ Map<String, String> extraMetadata) {
if (parquetField.isPrimitive()) {
PrimitiveType primitiveType = parquetField.asPrimitiveType();
PrimitiveType.PrimitiveTypeName primitiveTypeName =
primitiveType.getPrimitiveTypeName();
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
index a85b4ef55..f1bd64466 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.proto;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -120,8 +121,15 @@ public class ProtoParquetWriter<T extends
MessageOrBuilder> extends ParquetWrite
return this;
}
- protected WriteSupport<T> getWriteSupport(Configuration conf) {
- return (WriteSupport<T>)
ProtoParquetWriter.writeSupport(protoMessage);
- }
+ @Override
+ protected WriteSupport<T> getWriteSupport(Configuration conf) {
+ return getWriteSupport((ParquetConfiguration) null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
+ return (WriteSupport<T>) ProtoParquetWriter.writeSupport(protoMessage);
+ }
}
}
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
index 78edf70d2..6343992e5 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
@@ -21,6 +21,8 @@ package org.apache.parquet.proto;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
@@ -59,7 +61,7 @@ public class ProtoReadSupport<T extends Message> extends
ReadSupport<T> {
@Override
public ReadContext init(InitContext context) {
- String requestedProjectionString =
context.getConfiguration().get(PB_REQUESTED_PROJECTION);
+ String requestedProjectionString =
context.getParquetConfiguration().get(PB_REQUESTED_PROJECTION);
if (requestedProjectionString != null &&
!requestedProjectionString.trim().isEmpty()) {
MessageType requestedProjection =
getSchemaForRead(context.getFileSchema(), requestedProjectionString);
@@ -74,6 +76,11 @@ public class ProtoReadSupport<T extends Message> extends
ReadSupport<T> {
@Override
public RecordMaterializer<T> prepareForRead(Configuration configuration,
Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext
readContext) {
+ return prepareForRead(new HadoopParquetConfiguration(configuration),
keyValueMetaData, fileSchema, readContext);
+ }
+
+ @Override
+ public RecordMaterializer<T> prepareForRead(ParquetConfiguration
configuration, Map<String, String> keyValueMetaData, MessageType fileSchema,
ReadContext readContext) {
String headerProtoClass = keyValueMetaData.get(PB_CLASS);
String configuredProtoClass = configuration.get(PB_CLASS);
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
index 75a67f12c..4ddf23d6e 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
@@ -22,6 +22,7 @@ package org.apache.parquet.proto;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.schema.MessageType;
import java.util.Collections;
@@ -54,6 +55,11 @@ public class ProtoRecordConverter<T extends
MessageOrBuilder> extends ProtoMessa
reusedBuilder = getBuilder();
}
+ public ProtoRecordConverter(ParquetConfiguration conf, Class<? extends
Message> protoclass, MessageType parquetSchema, Map<String, String>
extraMetadata) {
+ super(conf, new SkipParentValueContainer(), protoclass, parquetSchema,
extraMetadata);
+ reusedBuilder = getBuilder();
+ }
+
public ProtoRecordConverter(Configuration conf, Message.Builder builder,
MessageType parquetSchema, Map<String, String> extraMetadata) {
super(conf, new SkipParentValueContainer(), builder, parquetSchema,
extraMetadata);
reusedBuilder = getBuilder();
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
index dd77ca6b6..63640d330 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
@@ -21,6 +21,8 @@ package org.apache.parquet.proto;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
@@ -32,6 +34,10 @@ class ProtoRecordMaterializer<T extends MessageOrBuilder>
extends RecordMaterial
private final ProtoRecordConverter<T> root;
public ProtoRecordMaterializer(Configuration conf, MessageType
requestedSchema, Class<? extends Message> protobufClass, Map<String, String>
metadata) {
+ this(new HadoopParquetConfiguration(conf), requestedSchema, protobufClass,
metadata);
+ }
+
+ public ProtoRecordMaterializer(ParquetConfiguration conf, MessageType
requestedSchema, Class<? extends Message> protobufClass, Map<String, String>
metadata) {
this.root = new ProtoRecordConverter<T>(conf, protobufClass,
requestedSchema, metadata);
}
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
index c3570323f..a6a779d07 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
@@ -25,6 +25,8 @@ import
com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -81,9 +83,19 @@ public class ProtoSchemaConverter {
* Instantiate a schema converter to get the parquet schema corresponding to
protobuf classes.
* Returns instances that are not specs compliant and limited to 5 levels of
recursion depth.
*
- * @param config Hadoop configuration object to parrse
parquetSpecsCompliant and maxRecursion settings.
+ * @param config Hadoop configuration object to parse
parquetSpecsCompliant and maxRecursion settings.
*/
public ProtoSchemaConverter(Configuration config) {
+ this(new HadoopParquetConfiguration(config));
+ }
+
+ /**
+ * Instantiate a schema converter to get the parquet schema corresponding to
protobuf classes.
+ * Returns instances that are not specs compliant and limited to 5 levels of
recursion depth.
+ *
+ * @param config Parquet configuration object to parse
parquetSpecsCompliant and maxRecursion settings.
+ */
+ public ProtoSchemaConverter(ParquetConfiguration config) {
this(
config.getBoolean(ProtoWriteSupport.PB_SPECS_COMPLIANT_WRITE, false),
config.getInt(PB_MAX_RECURSION, 5));
diff --git
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index f15511062..b13acd2a5 100644
---
a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++
b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -23,6 +23,8 @@ import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.twitter.elephantbird.util.Protobufs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.InvalidRecordException;
@@ -118,6 +120,11 @@ public class ProtoWriteSupport<T extends MessageOrBuilder>
extends WriteSupport<
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
Map<String, String> extraMetaData = new HashMap<>();
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
index 0c3fe440d..baa3ddb92 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
@@ -18,6 +18,8 @@ package org.apache.parquet.hadoop.thrift;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import com.twitter.elephantbird.pig.util.ThriftToPig;
@@ -40,14 +42,22 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
private static final Logger LOG =
LoggerFactory.getLogger(AbstractThriftWriteSupport.class);
- private static Configuration conf;
+ private static ParquetConfiguration conf;
public static void setGenericThriftClass(Configuration configuration,
Class<?> thriftClass) {
+ setGenericThriftClass(new HadoopParquetConfiguration(configuration),
thriftClass);
+ }
+
+ public static void setGenericThriftClass(ParquetConfiguration configuration,
Class<?> thriftClass) {
conf = configuration;
configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
}
- public static Class getGenericThriftClass(Configuration configuration) {
+ public static Class<?> getGenericThriftClass(Configuration configuration) {
+ return getGenericThriftClass(new
HadoopParquetConfiguration(configuration));
+ }
+
+ public static Class<?> getGenericThriftClass(ParquetConfiguration
configuration) {
final String thriftClassName = configuration.get(PARQUET_THRIFT_CLASS);
if (thriftClassName == null) {
throw new BadConfigurationException("the thrift class conf is missing in
job conf at " + PARQUET_THRIFT_CLASS);
@@ -111,9 +121,14 @@ public abstract class AbstractThriftWriteSupport<T>
extends WriteSupport<T> {
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
conf = configuration;
if (writeContext == null) {
- init(getGenericThriftClass(configuration));
+ init((Class<T>) getGenericThriftClass(configuration));
}
return writeContext;
}
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
index c1ece9fcf..60dfc12e7 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
@@ -16,6 +16,8 @@
package org.apache.parquet.hadoop.thrift;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
@@ -25,14 +27,22 @@ import
org.apache.parquet.thrift.struct.ThriftType.StructType;
public class TBaseWriteSupport<T extends TBase<?, ?>> extends
AbstractThriftWriteSupport<T> {
- private static Configuration conf;
+ private static ParquetConfiguration conf;
public static <U extends TBase<?,?>> void setThriftClass(Configuration
configuration, Class<U> thriftClass) {
+ setThriftClass(new HadoopParquetConfiguration(configuration), thriftClass);
+ }
+
+ public static <U extends TBase<?,?>> void
setThriftClass(ParquetConfiguration configuration, Class<U> thriftClass) {
conf = configuration;
AbstractThriftWriteSupport.setGenericThriftClass(configuration,
thriftClass);
}
public static Class<? extends TBase<?,?>> getThriftClass(Configuration
configuration) {
+ return getThriftClass(new HadoopParquetConfiguration(configuration));
+ }
+
+ public static Class<? extends TBase<?,?>>
getThriftClass(ParquetConfiguration configuration) {
return (Class<? extends
TBase<?,?>>)AbstractThriftWriteSupport.getGenericThriftClass(configuration);
}
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
index 6b9d75d98..6c9311f4e 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
@@ -24,6 +24,8 @@ import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -57,6 +59,10 @@ public class ThriftBytesWriteSupport extends
WriteSupport<BytesWritable> {
}
public static Class<TProtocolFactory> getTProtocolFactoryClass(Configuration
conf) {
+ return getTProtocolFactoryClass(new HadoopParquetConfiguration(conf));
+ }
+
+ public static Class<TProtocolFactory>
getTProtocolFactoryClass(ParquetConfiguration conf) {
final String tProtocolClassName = conf.get(PARQUET_PROTOCOL_CLASS);
if (tProtocolClassName == null) {
throw new BadConfigurationException("the protocol class conf is missing
in job conf at " + PARQUET_PROTOCOL_CLASS);
@@ -80,7 +86,7 @@ public class ThriftBytesWriteSupport extends
WriteSupport<BytesWritable> {
private StructType thriftStruct;
private ParquetWriteProtocol parquetWriteProtocol;
private final FieldIgnoredHandler errorHandler;
- private Configuration configuration;
+ private ParquetConfiguration configuration;
public ThriftBytesWriteSupport() {
this.buffered = true;
@@ -106,6 +112,15 @@ public class ThriftBytesWriteSupport extends
WriteSupport<BytesWritable> {
Class<? extends TBase<?, ?>> thriftClass,
boolean buffered,
FieldIgnoredHandler errorHandler) {
+ this(new HadoopParquetConfiguration(configuration), protocolFactory,
thriftClass, buffered, errorHandler);
+ }
+
+ public ThriftBytesWriteSupport(
+ ParquetConfiguration configuration,
+ TProtocolFactory protocolFactory,
+ Class<? extends TBase<?, ?>> thriftClass,
+ boolean buffered,
+ FieldIgnoredHandler errorHandler) {
super();
this.configuration = configuration;
this.protocolFactory = protocolFactory;
@@ -124,6 +139,11 @@ public class ThriftBytesWriteSupport extends
WriteSupport<BytesWritable> {
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
this.configuration = configuration;
if (this.protocolFactory == null) {
try {
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 2375a6df6..bd9530b82 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -25,6 +25,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TProtocol;
@@ -111,6 +113,10 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
}
public static FieldProjectionFilter getFieldProjectionFilter(Configuration
conf) {
+ return getFieldProjectionFilter(new HadoopParquetConfiguration(conf));
+ }
+
+ public static FieldProjectionFilter
getFieldProjectionFilter(ParquetConfiguration conf) {
String deprecated = conf.get(THRIFT_COLUMN_FILTER_KEY);
String strict = conf.get(STRICT_THRIFT_COLUMN_FILTER_KEY);
@@ -155,7 +161,7 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
@Override
public org.apache.parquet.hadoop.api.ReadSupport.ReadContext
init(InitContext context) {
- final Configuration configuration = context.getConfiguration();
+ final ParquetConfiguration configuration =
context.getParquetConfiguration();
final MessageType fileMessageType = context.getFileSchema();
MessageType requestedProjection = fileMessageType;
String partialSchemaString =
configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
@@ -185,9 +191,14 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
return new ReadContext(schemaForRead);
}
- @SuppressWarnings("unchecked")
protected MessageType getProjectedSchema(Configuration configuration,
FieldProjectionFilter
fieldProjectionFilter) {
+ return getProjectedSchema(new HadoopParquetConfiguration(configuration),
fieldProjectionFilter);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected MessageType getProjectedSchema(ParquetConfiguration configuration,
FieldProjectionFilter
+ fieldProjectionFilter) {
return new ThriftSchemaConverter(configuration, fieldProjectionFilter)
.convert((Class<TBase<?, ?>>)thriftClass);
}
@@ -200,8 +211,12 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
.convert((Class<TBase<?, ?>>)thriftClass);
}
- @SuppressWarnings("unchecked")
private void initThriftClassFromMultipleFiles(Map<String, Set<String>>
fileMetadata, Configuration conf) throws ClassNotFoundException {
+ initThriftClassFromMultipleFiles(fileMetadata, new
HadoopParquetConfiguration(conf));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initThriftClassFromMultipleFiles(Map<String, Set<String>>
fileMetadata, ParquetConfiguration conf) throws ClassNotFoundException {
if (thriftClass != null) {
return;
}
@@ -216,8 +231,12 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
thriftClass = (Class<T>)Class.forName(className);
}
- @SuppressWarnings("unchecked")
private void initThriftClass(ThriftMetaData metadata, Configuration conf)
throws ClassNotFoundException {
+ initThriftClass(metadata, new HadoopParquetConfiguration(conf));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initThriftClass(ThriftMetaData metadata, ParquetConfiguration
conf) throws ClassNotFoundException {
if (thriftClass != null) {
return;
}
@@ -254,10 +273,45 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
configuration);
}
- @SuppressWarnings("unchecked")
+ @Override
+ public RecordMaterializer<T> prepareForRead(ParquetConfiguration
configuration,
+ Map<String, String>
keyValueMetaData, MessageType fileSchema,
+
org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+ ThriftMetaData thriftMetaData =
ThriftMetaData.fromExtraMetaData(keyValueMetaData);
+ try {
+ initThriftClass(thriftMetaData, configuration);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Thrift object class for
metadata: " + thriftMetaData, e);
+ }
+
+ // if there was not metadata in the file, get it from requested class
+ if (thriftMetaData == null) {
+ thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass);
+ }
+
+ String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY,
RECORD_CONVERTER_DEFAULT);
+ return getRecordConverterInstance(converterClassName, thriftClass,
+ readContext.getRequestedSchema(), thriftMetaData.getDescriptor(),
+ configuration);
+ }
+
private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
String converterClassName, Class<T> thriftClass,
MessageType requestedSchema, StructType descriptor, Configuration conf) {
+ return getRecordConverterInstance(converterClassName, thriftClass,
requestedSchema, descriptor, conf, Configuration.class);
+ }
+
+ private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
+ String converterClassName, Class<T> thriftClass,
+ MessageType requestedSchema, StructType descriptor, ParquetConfiguration
conf) {
+ return getRecordConverterInstance(converterClassName, thriftClass,
requestedSchema, descriptor, conf, ParquetConfiguration.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, CONF> ThriftRecordConverter<T> getRecordConverterInstance(
+ String converterClassName, Class<T> thriftClass,
+ MessageType requestedSchema, StructType descriptor, CONF conf, Class<CONF>
confClass) {
+
Class<ThriftRecordConverter<T>> converterClass;
try {
converterClass = (Class<ThriftRecordConverter<T>>)
Class.forName(converterClassName);
@@ -269,14 +323,14 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
// first try the new version that accepts a Configuration
try {
Constructor<ThriftRecordConverter<T>> constructor =
- converterClass.getConstructor(Class.class, MessageType.class,
StructType.class, Configuration.class);
+ converterClass.getConstructor(Class.class, MessageType.class,
StructType.class, confClass);
return constructor.newInstance(thriftClass, requestedSchema,
descriptor, conf);
} catch (IllegalAccessException | NoSuchMethodException e) {
// try the other constructor pattern
}
Constructor<ThriftRecordConverter<T>> constructor =
- converterClass.getConstructor(Class.class, MessageType.class,
StructType.class);
+ converterClass.getConstructor(Class.class, MessageType.class,
StructType.class);
return constructor.newInstance(thriftClass, requestedSchema, descriptor);
} catch (InstantiationException | InvocationTargetException e) {
throw new RuntimeException("Failed to construct Thrift converter class:
" + converterClassName, e);
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
index a9864ff81..2ac4fccf1 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftWriteSupport.java
@@ -19,6 +19,7 @@
package org.apache.parquet.hadoop.thrift;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import org.apache.parquet.hadoop.api.WriteSupport;
@@ -68,6 +69,11 @@ public class ThriftWriteSupport<T extends TBase<?,?>>
extends WriteSupport<T> {
return this.writeSupport.init(configuration);
}
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
+ return this.writeSupport.init(configuration);
+ }
+
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.writeSupport.prepareForWrite(recordConsumer);
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
index ba48b3779..f129f36b7 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
@@ -22,6 +22,8 @@ package org.apache.parquet.thrift;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TList;
@@ -500,6 +502,11 @@ public class ParquetWriteProtocol extends ParquetProtocol {
public ParquetWriteProtocol(
Configuration configuration, RecordConsumer recordConsumer,
MessageColumnIO schema, StructType thriftType) {
+ this(new HadoopParquetConfiguration(configuration), recordConsumer,
schema, thriftType);
+ }
+
+ public ParquetWriteProtocol(
+ ParquetConfiguration configuration, RecordConsumer recordConsumer,
MessageColumnIO schema, StructType thriftType) {
this.recordConsumer = recordConsumer;
if (configuration != null) {
this.writeThreeLevelList = configuration.getBoolean(
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
index 78fc4a88f..fa31a5c78 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/TBaseRecordConverter.java
@@ -19,6 +19,8 @@
package org.apache.parquet.thrift;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
@@ -38,10 +40,15 @@ public class TBaseRecordConverter<T extends TBase<?,?>>
extends ThriftRecordConv
*/
@Deprecated
public TBaseRecordConverter(final Class<T> thriftClass, MessageType
requestedParquetSchema, StructType thriftType) {
- this(thriftClass, requestedParquetSchema, thriftType, null);
+ this(thriftClass, requestedParquetSchema, thriftType,
(HadoopParquetConfiguration) null);
}
+ @SuppressWarnings("unused")
public TBaseRecordConverter(final Class<T> thriftClass, MessageType
requestedParquetSchema, StructType thriftType, Configuration conf) {
+ this(thriftClass, requestedParquetSchema, thriftType, new
HadoopParquetConfiguration(conf));
+ }
+
+ public TBaseRecordConverter(final Class<T> thriftClass, MessageType
requestedParquetSchema, StructType thriftType, ParquetConfiguration conf) {
super(new ThriftReader<T>() {
@Override
public T readOneRecord(TProtocol protocol) throws TException {
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
index 3244b3211..d0649212f 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TList;
@@ -843,7 +845,7 @@ public class ThriftRecordConverter<T> extends
RecordMaterializer<T> {
*/
@Deprecated
public ThriftRecordConverter(ThriftReader<T> thriftReader, String name,
MessageType requestedParquetSchema, ThriftType.StructType thriftType) {
- this(thriftReader, name, requestedParquetSchema, thriftType, null);
+ this(thriftReader, name, requestedParquetSchema, thriftType,
(ParquetConfiguration) null);
}
/**
@@ -855,6 +857,17 @@ public class ThriftRecordConverter<T> extends
RecordMaterializer<T> {
* @param conf a Configuration
*/
public ThriftRecordConverter(ThriftReader<T> thriftReader, String name,
MessageType requestedParquetSchema, ThriftType.StructType thriftType,
Configuration conf) {
+ this(thriftReader, name, requestedParquetSchema, thriftType, new
HadoopParquetConfiguration(conf));
+ }
+
+ /**
+ * @param thriftReader the class responsible for instantiating the final
object and read from the protocol
+ * @param name the name of that type ( the thrift class simple name)
+ * @param requestedParquetSchema the schema for the incoming columnar events
+ * @param thriftType the thrift type descriptor
+ * @param conf a Configuration
+ */
+ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name,
MessageType requestedParquetSchema, ThriftType.StructType thriftType,
ParquetConfiguration conf) {
super();
this.thriftReader = thriftReader;
this.protocol = new ParquetReadProtocol();
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index f915a6ed1..c32df8147 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -24,6 +24,8 @@ import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ShouldNeverHappenException;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
@@ -80,6 +82,11 @@ class ThriftSchemaConvertVisitor implements
ThriftType.StateVisitor<ConvertedFie
private ThriftSchemaConvertVisitor(FieldProjectionFilter
fieldProjectionFilter, boolean doProjection,
boolean keepOneOfEachUnion, Configuration
configuration) {
+ this(fieldProjectionFilter, doProjection, keepOneOfEachUnion, new
HadoopParquetConfiguration(configuration));
+ }
+
+ private ThriftSchemaConvertVisitor(FieldProjectionFilter
fieldProjectionFilter, boolean doProjection,
+ boolean keepOneOfEachUnion,
ParquetConfiguration configuration) {
this.fieldProjectionFilter = Objects.requireNonNull(fieldProjectionFilter,
"fieldProjectionFilter cannot be null");
this.doProjection = doProjection;
@@ -105,6 +112,11 @@ class ThriftSchemaConvertVisitor implements
ThriftType.StateVisitor<ConvertedFie
public static MessageType convert(StructType struct, FieldProjectionFilter
filter, boolean keepOneOfEachUnion,
Configuration conf) {
+ return convert(struct, filter, keepOneOfEachUnion, new
HadoopParquetConfiguration(conf));
+ }
+
+ public static MessageType convert(StructType struct, FieldProjectionFilter
filter, boolean keepOneOfEachUnion,
+ ParquetConfiguration conf) {
State state = new State(new FieldsPath(), REPEATED, "ParquetSchema");
ConvertedField converted = struct.accept(
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
index 61cfd4c1b..662450217 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.thrift.TBase;
import org.apache.thrift.TEnum;
import org.apache.thrift.TUnion;
@@ -52,19 +54,28 @@ import static
org.apache.parquet.schema.Type.Repetition.REPEATED;
public class ThriftSchemaConverter {
private final FieldProjectionFilter fieldProjectionFilter;
- private Configuration conf;
+ private ParquetConfiguration conf;
public ThriftSchemaConverter() {
this(FieldProjectionFilter.ALL_COLUMNS);
}
public ThriftSchemaConverter(Configuration configuration) {
+ this(new HadoopParquetConfiguration(configuration));
+ }
+
+ public ThriftSchemaConverter(ParquetConfiguration configuration) {
this();
conf = configuration;
}
public ThriftSchemaConverter(
Configuration configuration, FieldProjectionFilter
fieldProjectionFilter) {
+ this(new HadoopParquetConfiguration(configuration), fieldProjectionFilter);
+ }
+
+ public ThriftSchemaConverter(
+ ParquetConfiguration configuration, FieldProjectionFilter
fieldProjectionFilter) {
this(fieldProjectionFilter);
conf = configuration;
}
diff --git
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
index d75206124..f582e7c1a 100644
---
a/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
+++
b/parquet-thrift/src/main/java/org/apache/parquet/thrift/pig/TupleToThriftWriteSupport.java
@@ -19,6 +19,8 @@
package org.apache.parquet.thrift.pig;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.thrift.TBase;
@@ -51,9 +53,14 @@ public class TupleToThriftWriteSupport extends
WriteSupport<Tuple> {
return "thrift";
}
- @SuppressWarnings({"rawtypes", "unchecked"})
@Override
public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
try {
Class<?> clazz =
configuration.getClassByName(className).asSubclass(TBase.class);
thriftWriteSupport = new ThriftWriteSupport(clazz);
diff --git
a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
index 1311d7690..98f22d12a 100644
---
a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
+++
b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import com.twitter.elephantbird.thrift.test.TestMapInList;
import com.twitter.elephantbird.thrift.test.TestNameSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.ParquetConfiguration;
import org.junit.ComparisonFailure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -708,7 +709,7 @@ public class TestParquetWriteProtocol {
MessageType schema = new PigSchemaConverter().convert(pigSchema);
LOG.info("{}", schema);
TupleWriteSupport tupleWriteSupport = new TupleWriteSupport(pigSchema);
- tupleWriteSupport.init(null);
+ tupleWriteSupport.init((ParquetConfiguration) null);
tupleWriteSupport.prepareForWrite(recordConsumer);
final Tuple pigTuple = thriftToPig.getPigTuple(a);
LOG.info("{}", pigTuple);
diff --git a/pom.xml b/pom.xml
index 7fe6bf28f..0f8228689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -540,6 +540,8 @@
</excludeModules>
<excludes>
<exclude>${shade.prefix}</exclude>
+ <exclude>org.apache.parquet.hadoop.CodecFactory</exclude> <!--
change field type from Configuration to ParquetConfiguration -->
+ <exclude>org.apache.parquet.hadoop.ParquetReader</exclude> <!--
change field type from Configuration to ParquetConfiguration -->
<exclude>org.apache.parquet.thrift.projection.deprecated.PathGlobPattern</exclude>
<!-- japicmp is overly aggressive on interface types in
signatures, a type was changed to a supertype but this still triggers it -->
<exclude>org.apache.parquet.hadoop.ColumnChunkPageWriteStore</exclude>