This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 238b9a2d9 PARQUET-2385: Allow user to specify CodecFactory for 
ParquetWriter (#1203)
238b9a2d9 is described below

commit 238b9a2d9dd7adf1b026b86110657e5c3268c9a3
Author: Atour <[email protected]>
AuthorDate: Mon Dec 4 06:58:08 2023 +0100

    PARQUET-2385: Allow user to specify CodecFactory for ParquetWriter (#1203)
---
 .../org/apache/parquet/avro/TestReadWrite.java     | 127 +++++++++++++++------
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  49 +++++++-
 2 files changed, 136 insertions(+), 40 deletions(-)

diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 024860b4d..d0b2a7dba 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -55,12 +55,14 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.conf.HadoopParquetConfiguration;
 import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.conf.PlainParquetConfiguration;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
 import org.apache.parquet.io.LocalInputFile;
 import org.apache.parquet.io.LocalOutputFile;
 import org.apache.parquet.io.api.Binary;
@@ -76,41 +78,88 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class TestReadWrite {
 
+  enum Converters {
+    COMPATIBLE(true),
+    NEW(false);
+
+    final boolean compat;
+
+    Converters(boolean compatible) {
+      compat = compatible;
+    }
+
+    public boolean isCompatible() {
+      return compat;
+    }
+  }
+
+  enum FileLocation {
+    LOCAL,
+    HADOOP
+  }
+
+  enum ConfigurationType {
+    HADOOP_CONFIGURATION,
+    HADOOP_PARQUET_INTERFACE,
+    PLAIN_PARQUET_INTERFACE
+  }
+
+  enum CodecFactory {
+    IMPLICIT,
+    EXPLICIT
+  }
+
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     Object[][] data = new Object[][] {
-      {true, false, false, false}, // use the old converters with hadoop config
-      {true, false, true, false}, // use the old converters with parquet 
config interface
-      {false, false, false, false}, // use the new converters with hadoop 
config
-      {false, true, false, false}, // use a local disk location with hadoop 
config
-      {false, false, true, false}, // use the new converters with parquet 
config interface
-      {false, true, true, false}, // use a local disk location with parquet 
config interface
-      {false, false, true, true}, // use the new converters with plain parquet 
config
-      {false, true, true, true}
-    }; // use a local disk location with plain parquet config
+      {Converters.COMPATIBLE, FileLocation.HADOOP, 
ConfigurationType.HADOOP_CONFIGURATION, CodecFactory.IMPLICIT},
+      {
+        Converters.COMPATIBLE,
+        FileLocation.HADOOP,
+        ConfigurationType.HADOOP_PARQUET_INTERFACE,
+        CodecFactory.IMPLICIT
+      },
+      {Converters.NEW, FileLocation.HADOOP, 
ConfigurationType.HADOOP_CONFIGURATION, CodecFactory.IMPLICIT},
+      {Converters.NEW, FileLocation.LOCAL, 
ConfigurationType.HADOOP_CONFIGURATION, CodecFactory.IMPLICIT},
+      {Converters.NEW, FileLocation.HADOOP, 
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+      {Converters.NEW, FileLocation.LOCAL, 
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+      {Converters.NEW, FileLocation.HADOOP, 
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+      {Converters.NEW, FileLocation.LOCAL, 
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.IMPLICIT},
+      {
+        Converters.COMPATIBLE,
+        FileLocation.HADOOP,
+        ConfigurationType.HADOOP_PARQUET_INTERFACE,
+        CodecFactory.EXPLICIT
+      },
+      {Converters.NEW, FileLocation.HADOOP, 
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.EXPLICIT},
+      {Converters.NEW, FileLocation.LOCAL, 
ConfigurationType.HADOOP_PARQUET_INTERFACE, CodecFactory.EXPLICIT},
+      {Converters.NEW, FileLocation.HADOOP, 
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.EXPLICIT},
+      {Converters.NEW, FileLocation.LOCAL, 
ConfigurationType.PLAIN_PARQUET_INTERFACE, CodecFactory.EXPLICIT}
+    };
     return Arrays.asList(data);
   }
 
-  private final boolean compat;
-  private final boolean local;
-  private final boolean confInterface;
-  private final boolean plainConf;
+  private final Converters converter;
+  private final FileLocation fileLocation;
+  private final ConfigurationType conf;
+  private final CodecFactory codecType;
+
   private final Configuration testConf = new Configuration();
   private final ParquetConfiguration hadoopConfWithInterface = new 
HadoopParquetConfiguration();
   private final ParquetConfiguration plainParquetConf = new 
PlainParquetConfiguration();
 
-  public TestReadWrite(boolean compat, boolean local, boolean confInterface, 
boolean plainConf) {
-    this.compat = compat;
-    this.local = local;
-    this.confInterface = confInterface;
-    this.plainConf = plainConf;
-    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+  public TestReadWrite(Converters converter, FileLocation fileLocation, 
ConfigurationType conf, CodecFactory codecs) {
+    this.converter = converter;
+    this.fileLocation = fileLocation;
+    this.conf = conf;
+    this.codecType = codecs;
+    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, 
converter.isCompatible());
     this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
     this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
-    
this.hadoopConfWithInterface.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, 
compat);
+    
this.hadoopConfWithInterface.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, 
converter.isCompatible());
     
this.hadoopConfWithInterface.setBoolean("parquet.avro.add-list-element-records",
 false);
     
this.hadoopConfWithInterface.setBoolean("parquet.avro.write-old-list-structure",
 false);
-    this.plainParquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, 
compat);
+    this.plainParquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, 
converter.isCompatible());
     this.plainParquetConf.setBoolean("parquet.avro.add-list-element-records", 
false);
     this.plainParquetConf.setBoolean("parquet.avro.write-old-list-structure", 
false);
   }
@@ -402,7 +451,7 @@ public class TestReadWrite {
       nextRecord = reader.read();
     }
 
-    Object expectedEnumSymbol = compat
+    Object expectedEnumSymbol = converter.isCompatible()
         ? "a"
         : new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a");
 
@@ -879,19 +928,21 @@ public class TestReadWrite {
 
   private ParquetWriter<GenericRecord> writer(String file, Schema schema) 
throws IOException {
     AvroParquetWriter.Builder<GenericRecord> writerBuilder;
-    if (local) {
+    if (fileLocation == FileLocation.LOCAL) {
       writerBuilder = AvroParquetWriter.<GenericRecord>builder(new 
LocalOutputFile(Paths.get(file)))
           .withSchema(schema);
     } else {
       writerBuilder =
           AvroParquetWriter.<GenericRecord>builder(new 
Path(file)).withSchema(schema);
     }
-    if (confInterface) {
-      if (plainConf) {
-        return writerBuilder.withConf(hadoopConfWithInterface).build();
-      } else {
-        return writerBuilder.withConf(plainParquetConf).build();
-      }
+    if (codecType == CodecFactory.EXPLICIT) {
+      writerBuilder =
+          
writerBuilder.withCodecFactory(HadoopCodecs.newFactory(ParquetProperties.DEFAULT_PAGE_SIZE));
+    }
+    if (conf == ConfigurationType.PLAIN_PARQUET_INTERFACE) {
+      return writerBuilder.withConf(hadoopConfWithInterface).build();
+    } else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
+      return writerBuilder.withConf(plainParquetConf).build();
     } else {
       return writerBuilder.withConf(testConf).build();
     }
@@ -899,18 +950,20 @@ public class TestReadWrite {
 
   private ParquetReader<GenericRecord> reader(String file) throws IOException {
     AvroParquetReader.Builder<GenericRecord> readerBuilder;
-    if (local) {
+    if (fileLocation == FileLocation.LOCAL) {
       readerBuilder = AvroParquetReader.<GenericRecord>builder(new 
LocalInputFile(Paths.get(file)))
           .withDataModel(GenericData.get());
     } else {
       return new AvroParquetReader<>(testConf, new Path(file));
     }
-    if (confInterface) {
-      if (plainConf) {
-        return readerBuilder.withConf(hadoopConfWithInterface).build();
-      } else {
-        return readerBuilder.withConf(plainParquetConf).build();
-      }
+    if (codecType == CodecFactory.EXPLICIT) {
+      readerBuilder = (AvroParquetReader.Builder<GenericRecord>)
+          
readerBuilder.withCodecFactory(HadoopCodecs.newFactory(ParquetProperties.DEFAULT_PAGE_SIZE));
+    }
+    if (conf == ConfigurationType.PLAIN_PARQUET_INTERFACE) {
+      return readerBuilder.withConf(hadoopConfWithInterface).build();
+    } else if (conf == ConfigurationType.HADOOP_PARQUET_INTERFACE) {
+      return readerBuilder.withConf(plainParquetConf).build();
     } else {
       return readerBuilder.withConf(testConf).build();
     }
@@ -920,6 +973,6 @@ public class TestReadWrite {
    * Return a String or Utf8 depending on whether compatibility is on
    */
   public CharSequence str(String value) {
-    return compat ? value : new Utf8(value);
+    return converter.isCompatible() ? value : new Utf8(value);
   }
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 37da3de86..fc9db5872 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -350,7 +350,33 @@ public class ParquetWriter<T> implements Closeable {
       ParquetProperties encodingProps,
       FileEncryptionProperties encryptionProperties)
       throws IOException {
+    this(
+        file,
+        mode,
+        writeSupport,
+        compressionCodecName,
+        new CodecFactory(conf, encodingProps.getPageSizeThreshold()),
+        rowGroupSize,
+        validating,
+        conf,
+        maxPaddingSize,
+        encodingProps,
+        encryptionProperties);
+  }
 
+  ParquetWriter(
+      OutputFile file,
+      ParquetFileWriter.Mode mode,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      CompressionCodecFactory codecFactory,
+      long rowGroupSize,
+      boolean validating,
+      ParquetConfiguration conf,
+      int maxPaddingSize,
+      ParquetProperties encodingProps,
+      FileEncryptionProperties encryptionProperties)
+      throws IOException {
     WriteSupport.WriteContext writeContext = writeSupport.init(conf);
     MessageType schema = writeContext.getSchema();
 
@@ -375,7 +401,7 @@ public class ParquetWriter<T> implements Closeable {
         encryptionProperties);
     fileWriter.start();
 
-    this.codecFactory = new CodecFactory(conf, 
encodingProps.getPageSizeThreshold());
+    this.codecFactory = codecFactory;
     CompressionCodecFactory.BytesInputCompressor compressor = 
codecFactory.getCompressor(compressionCodecName);
     this.writer = new InternalParquetRecordWriter<T>(
         fileWriter,
@@ -437,6 +463,7 @@ public class ParquetWriter<T> implements Closeable {
     private FileEncryptionProperties encryptionProperties = null;
     private ParquetConfiguration conf = null;
     private ParquetFileWriter.Mode mode;
+    private CompressionCodecFactory codecFactory = null;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
     private long rowGroupSize = DEFAULT_BLOCK_SIZE;
     private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
@@ -517,6 +544,18 @@ public class ParquetWriter<T> implements Closeable {
       return self();
     }
 
+    /**
+     * Set the {@link CompressionCodecFactory codec factory} used by the
+     * constructed writer.
+     *
+     * @param codecFactory a {@link CompressionCodecFactory}
+     * @return this builder for method chaining.
+     */
+    public SELF withCodecFactory(CompressionCodecFactory codecFactory) {
+      this.codecFactory = codecFactory;
+      return self();
+    }
+
     /**
      * Set the {@link FileEncryptionProperties file encryption properties} 
used by the
      * constructed writer.
@@ -836,6 +875,10 @@ public class ParquetWriter<T> implements Closeable {
       if (conf == null) {
         conf = new HadoopParquetConfiguration();
       }
+      ParquetProperties encodingProps = encodingPropsBuilder.build();
+      if (codecFactory == null) {
+        codecFactory = new CodecFactory(conf, 
encodingProps.getPageSizeThreshold());
+      }
       if (file != null) {
         return new ParquetWriter<>(
             file,
@@ -846,7 +889,7 @@ public class ParquetWriter<T> implements Closeable {
             enableValidation,
             conf,
             maxPaddingSize,
-            encodingPropsBuilder.build(),
+            encodingProps,
             encryptionProperties);
       } else {
         return new ParquetWriter<>(
@@ -858,7 +901,7 @@ public class ParquetWriter<T> implements Closeable {
             enableValidation,
             conf,
             maxPaddingSize,
-            encodingPropsBuilder.build(),
+            encodingProps,
             encryptionProperties);
       }
     }

Reply via email to