This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 238b9a2d9 PARQUET-2385: Allow user to specify CodecFactory for
ParquetWriter (#1203)
238b9a2d9 is described below
commit 238b9a2d9dd7adf1b026b86110657e5c3268c9a3
Author: Atour <[email protected]>
AuthorDate: Mon Dec 4 06:58:08 2023 +0100
PARQUET-2385: Allow user to specify CodecFactory for ParquetWriter (#1203)
---
.../org/apache/parquet/avro/TestReadWrite.java | 127 +++++++++++++++------
.../org/apache/parquet/hadoop/ParquetWriter.java | 49 +++++++-
2 files changed, 136 insertions(+), 40 deletions(-)
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 024860b4d..d0b2a7dba 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -55,12 +55,14 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.api.Binary;
@@ -76,41 +78,88 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestReadWrite {
+ enum Converters {
+ COMPATIBLE(true),
+ NEW(false);
+
+ final boolean compat;
+
+ Converters(boolean compatible) {
+ compat = compatible;
+ }
+
+ public boolean isCompatible() {
+ return compat;
+ }
+ }
+
+ enum FileLocation {
+ LOCAL,
+ HADOOP
+ }
+
+ enum ConfigurationType {
+ HADOOP_CONFIGURATION,
+ HADOOP_PARQUET_INTERFACE,
+ PLAIN_PARQUET_INTERFACE
+ }
+
+ enum CodecFactory {
+ IMPLICIT,
+ EXPLICIT
+ }
+
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
- {true, false, false, false}, // use the old converters with hadoop config
- {true, false, true, false}, // use the old converters with parquet
config interface
- {false, false, false, false}, // use the new converters with hadoop
config
- {false, true, false, false}, // use a local disk location with hadoop
config
- {false, false, true, false}, // use the new converters with parquet
config interface
- {false, true, true, false}, // use a local disk location with parquet
config interface
- {false, false, true, true}, // use the new converters with plain parquet
config
- {false, true, true, true}
- }; // use a local disk location with plain parquet config
+ {Converters.COMPATIBLE, FileLocation.HADOOP,
ConfigurationType.HADOOP_CONFIGURATION, CodecFactory.IMPLICIT},
+ {
+ Converters.COMPATIBLE,
+ FileLocation.HADOOP,
+ ConfigurationType.HADOOP_PARQUET_INTERFACE,
+ CodecFactory.IMPLICIT
+ },
+ {Converters.NEW, FileLocation.HADOOP,
ConfigurationType.HADOOP_CONFIGURATION, CodecFactory.IMPLICIT},
+ {Converters.NEW, FileLocation.LOCAL,
ConfigurationType.HADOOP_CONFIGURATION, CodecFactory.IMPLICIT},
+ {Converters.NEW, FileLocation.HADOOP,
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+ {Converters.NEW, FileLocation.LOCAL,
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+ {Converters.NEW, FileLocation.HADOOP,
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+ {Converters.NEW, FileLocation.LOCAL,
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+ {
+ Converters.COMPATIBLE,
+ FileLocation.HADOOP,
+ ConfigurationType.HADOOP_PARQUET_INTERFACE,
+ CodecFactory.EXPLICIT
+ },
+ {Converters.NEW, FileLocation.HADOOP,
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.EXPLICIT},
+ {Converters.NEW, FileLocation.LOCAL,
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.EXPLICIT},
+ {Converters.NEW, FileLocation.HADOOP,
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.EXPLICIT},
+ {Converters.NEW, FileLocation.LOCAL,
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.EXPLICIT}
+ };
return Arrays.asList(data);
}
- private final boolean compat;
- private final boolean local;
- private final boolean confInterface;
- private final boolean plainConf;
+ private final Converters converter;
+ private final FileLocation fileLocation;
+ private final ConfigurationType conf;
+ private final CodecFactory codecType;
+
private final Configuration testConf = new Configuration();
private final ParquetConfiguration hadoopConfWithInterface = new
HadoopParquetConfiguration();
private final ParquetConfiguration plainParquetConf = new
PlainParquetConfiguration();
- public TestReadWrite(boolean compat, boolean local, boolean confInterface,
boolean plainConf) {
- this.compat = compat;
- this.local = local;
- this.confInterface = confInterface;
- this.plainConf = plainConf;
- this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+ public TestReadWrite(Converters converter, FileLocation fileLocation,
ConfigurationType conf, CodecFactory codecs) {
+ this.converter = converter;
+ this.fileLocation = fileLocation;
+ this.conf = conf;
+ this.codecType = codecs;
+ this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
converter.isCompatible());
this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
-
this.hadoopConfWithInterface.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
compat);
+
this.hadoopConfWithInterface.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
converter.isCompatible());
this.hadoopConfWithInterface.setBoolean("parquet.avro.add-list-element-records",
false);
this.hadoopConfWithInterface.setBoolean("parquet.avro.write-old-list-structure",
false);
- this.plainParquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
compat);
+ this.plainParquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
converter.isCompatible());
this.plainParquetConf.setBoolean("parquet.avro.add-list-element-records",
false);
this.plainParquetConf.setBoolean("parquet.avro.write-old-list-structure",
false);
}
@@ -402,7 +451,7 @@ public class TestReadWrite {
nextRecord = reader.read();
}
- Object expectedEnumSymbol = compat
+ Object expectedEnumSymbol = converter.isCompatible()
? "a"
: new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a");
@@ -879,19 +928,21 @@ public class TestReadWrite {
private ParquetWriter<GenericRecord> writer(String file, Schema schema)
throws IOException {
AvroParquetWriter.Builder<GenericRecord> writerBuilder;
- if (local) {
+ if (fileLocation == FileLocation.LOCAL) {
writerBuilder = AvroParquetWriter.<GenericRecord>builder(new
LocalOutputFile(Paths.get(file)))
.withSchema(schema);
} else {
writerBuilder =
AvroParquetWriter.<GenericRecord>builder(new
Path(file)).withSchema(schema);
}
- if (confInterface) {
- if (plainConf) {
- return writerBuilder.withConf(hadoopConfWithInterface).build();
- } else {
- return writerBuilder.withConf(plainParquetConf).build();
- }
+ if (codecType == CodecFactory.EXPLICIT) {
+ writerBuilder =
+
writerBuilder.withCodecFactory(HadoopCodecs.newFactory(ParquetProperties.DEFAULT_PAGE_SIZE));
+ }
+ if (conf == ConfigurationType.PLAIN_PARQUET_INTERFACE) {
+ return writerBuilder.withConf(hadoopConfWithInterface).build();
+ } else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
+ return writerBuilder.withConf(plainParquetConf).build();
} else {
return writerBuilder.withConf(testConf).build();
}
@@ -899,18 +950,20 @@ public class TestReadWrite {
private ParquetReader<GenericRecord> reader(String file) throws IOException {
AvroParquetReader.Builder<GenericRecord> readerBuilder;
- if (local) {
+ if (fileLocation == FileLocation.LOCAL) {
readerBuilder = AvroParquetReader.<GenericRecord>builder(new
LocalInputFile(Paths.get(file)))
.withDataModel(GenericData.get());
} else {
return new AvroParquetReader<>(testConf, new Path(file));
}
- if (confInterface) {
- if (plainConf) {
- return readerBuilder.withConf(hadoopConfWithInterface).build();
- } else {
- return readerBuilder.withConf(plainParquetConf).build();
- }
+ if (codecType == CodecFactory.EXPLICIT) {
+ readerBuilder = (AvroParquetReader.Builder<GenericRecord>)
+
readerBuilder.withCodecFactory(HadoopCodecs.newFactory(ParquetProperties.DEFAULT_PAGE_SIZE));
+ }
+ if (conf == ConfigurationType.PLAIN_PARQUET_INTERFACE) {
+ return readerBuilder.withConf(hadoopConfWithInterface).build();
+ } else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
+ return readerBuilder.withConf(plainParquetConf).build();
} else {
return readerBuilder.withConf(testConf).build();
}
@@ -920,6 +973,6 @@ public class TestReadWrite {
* Return a String or Utf8 depending on whether compatibility is on
*/
public CharSequence str(String value) {
- return compat ? value : new Utf8(value);
+ return converter.isCompatible() ? value : new Utf8(value);
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 37da3de86..fc9db5872 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -350,7 +350,33 @@ public class ParquetWriter<T> implements Closeable {
ParquetProperties encodingProps,
FileEncryptionProperties encryptionProperties)
throws IOException {
+ this(
+ file,
+ mode,
+ writeSupport,
+ compressionCodecName,
+ new CodecFactory(conf, encodingProps.getPageSizeThreshold()),
+ rowGroupSize,
+ validating,
+ conf,
+ maxPaddingSize,
+ encodingProps,
+ encryptionProperties);
+ }
+ ParquetWriter(
+ OutputFile file,
+ ParquetFileWriter.Mode mode,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ CompressionCodecFactory codecFactory,
+ long rowGroupSize,
+ boolean validating,
+ ParquetConfiguration conf,
+ int maxPaddingSize,
+ ParquetProperties encodingProps,
+ FileEncryptionProperties encryptionProperties)
+ throws IOException {
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
MessageType schema = writeContext.getSchema();
@@ -375,7 +401,7 @@ public class ParquetWriter<T> implements Closeable {
encryptionProperties);
fileWriter.start();
- this.codecFactory = new CodecFactory(conf,
encodingProps.getPageSizeThreshold());
+ this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(compressionCodecName);
this.writer = new InternalParquetRecordWriter<T>(
fileWriter,
@@ -437,6 +463,7 @@ public class ParquetWriter<T> implements Closeable {
private FileEncryptionProperties encryptionProperties = null;
private ParquetConfiguration conf = null;
private ParquetFileWriter.Mode mode;
+ private CompressionCodecFactory codecFactory = null;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
@@ -517,6 +544,18 @@ public class ParquetWriter<T> implements Closeable {
return self();
}
+ /**
+ * Set the {@link CompressionCodecFactory codec factory} used by the
+ * constructed writer.
+ *
+ * @param codecFactory a {@link CompressionCodecFactory}
+ * @return this builder for method chaining.
+ */
+ public SELF withCodecFactory(CompressionCodecFactory codecFactory) {
+ this.codecFactory = codecFactory;
+ return self();
+ }
+
/**
* Set the {@link FileEncryptionProperties file encryption properties}
used by the
* constructed writer.
@@ -836,6 +875,10 @@ public class ParquetWriter<T> implements Closeable {
if (conf == null) {
conf = new HadoopParquetConfiguration();
}
+ ParquetProperties encodingProps = encodingPropsBuilder.build();
+ if (codecFactory == null) {
+ codecFactory = new CodecFactory(conf,
encodingProps.getPageSizeThreshold());
+ }
if (file != null) {
return new ParquetWriter<>(
file,
@@ -846,7 +889,7 @@ public class ParquetWriter<T> implements Closeable {
enableValidation,
conf,
maxPaddingSize,
- encodingPropsBuilder.build(),
+ encodingProps,
encryptionProperties);
} else {
return new ParquetWriter<>(
@@ -858,7 +901,7 @@ public class ParquetWriter<T> implements Closeable {
enableValidation,
conf,
maxPaddingSize,
- encodingPropsBuilder.build(),
+ encodingProps,
encryptionProperties);
}
}