This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f284355 PARQUET-2413: Support configurable extraMetadata in 
ParquetWriter (#1241)
19f284355 is described below

commit 19f284355847696fa254c789ab93c42db9af5982
Author: Claire McGinty <[email protected]>
AuthorDate: Sun Jan 28 02:27:10 2024 -0500

    PARQUET-2413: Support configurable extraMetadata in ParquetWriter (#1241)
---
 .../apache/parquet/column/ParquetProperties.java   | 15 ++++++
 .../org/apache/parquet/hadoop/ParquetWriter.java   | 43 +++++++++++++---
 .../hadoop/example/ExampleParquetWriter.java       | 14 +++--
 .../apache/parquet/hadoop/TestParquetWriter.java   | 59 ++++++++++++++++++++++
 4 files changed, 115 insertions(+), 16 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 7bf4009ee..5152d5b07 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -20,6 +20,8 @@ package org.apache.parquet.column;
 
 import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalDouble;
 import java.util.OptionalLong;
@@ -113,6 +115,7 @@ public class ParquetProperties {
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
   private final boolean enableByteStreamSplit;
+  private final Map<String, String> extraMetaData;
 
   private ParquetProperties(Builder builder) {
     this.pageSizeThreshold = builder.pageSize;
@@ -139,6 +142,7 @@ public class ParquetProperties {
     this.pageRowCountLimit = builder.pageRowCountLimit;
     this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
     this.enableByteStreamSplit = builder.enableByteStreamSplit;
+    this.extraMetaData = builder.extraMetaData;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -293,6 +297,10 @@ public class ParquetProperties {
     return numBloomFilterCandidates.getValue(column);
   }
 
+  public Map<String, String> getExtraMetaData() {
+    return extraMetaData;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -342,6 +350,7 @@ public class ParquetProperties {
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = 
DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
     private boolean enableByteStreamSplit = 
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
+    private Map<String, String> extraMetaData = new HashMap<>();
 
     private Builder() {
       enableDict = 
ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
@@ -373,6 +382,7 @@ public class ParquetProperties {
       this.numBloomFilterCandidates = 
ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
       this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
       this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
+      this.extraMetaData = toCopy.extraMetaData;
     }
 
     /**
@@ -584,6 +594,11 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+      this.extraMetaData = extraMetaData;
+      return this;
+    }
+
     public ParquetProperties build() {
       ParquetProperties properties = new ParquetProperties(this);
       // we pass a constructed but uninitialized factory to ParquetProperties 
above as currently
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index fc9db5872..1838d1db4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -20,6 +20,8 @@ package org.apache.parquet.hadoop;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ParquetProperties;
@@ -403,15 +405,29 @@ public class ParquetWriter<T> implements Closeable {
 
     this.codecFactory = codecFactory;
     CompressionCodecFactory.BytesInputCompressor compressor = 
codecFactory.getCompressor(compressionCodecName);
+
+    final Map<String, String> extraMetadata;
+    if (encodingProps.getExtraMetaData() == null
+        || encodingProps.getExtraMetaData().isEmpty()) {
+      extraMetadata = writeContext.getExtraMetaData();
+    } else {
+      extraMetadata = new HashMap<>(writeContext.getExtraMetaData());
+
+      encodingProps.getExtraMetaData().forEach((metadataKey, metadataValue) -> 
{
+        if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
+          throw new IllegalArgumentException("Cannot overwrite metadata key " 
+ OBJECT_MODEL_NAME_PROP
+              + ". Please use another key name.");
+        }
+
+        if (extraMetadata.put(metadataKey, metadataValue) != null) {
+          throw new IllegalArgumentException(
+              "Duplicate metadata key " + metadataKey + ". Please use another 
key name.");
+        }
+      });
+    }
+
     this.writer = new InternalParquetRecordWriter<T>(
-        fileWriter,
-        writeSupport,
-        schema,
-        writeContext.getExtraMetaData(),
-        rowGroupSize,
-        compressor,
-        validating,
-        encodingProps);
+        fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, 
compressor, validating, encodingProps);
   }
 
   public void write(T object) throws IOException {
@@ -849,6 +865,17 @@ public class ParquetWriter<T> implements Closeable {
       return self();
     }
 
+    /**
+     * Sets additional metadata entries to be included in the file footer.
+     *
+     * @param extraMetaData a Map of additional stringly-typed metadata entries
+     * @return this builder for method chaining
+     */
+    public SELF withExtraMetaData(Map<String, String> extraMetaData) {
+      encodingPropsBuilder.withExtraMetaData(extraMetaData);
+      return self();
+    }
+
     /**
      * Set a property that will be available to the read path. For writers 
that use a Hadoop
      * configuration, this is the recommended way to add configuration values.
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
index 23df1faa3..e6b71a49d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
@@ -19,7 +19,6 @@
 package org.apache.parquet.hadoop.example;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -98,7 +97,6 @@ public class ExampleParquetWriter extends 
ParquetWriter<Group> {
 
   public static class Builder extends ParquetWriter.Builder<Group, Builder> {
     private MessageType type = null;
-    private Map<String, String> extraMetaData = new HashMap<String, String>();
 
     private Builder(Path file) {
       super(file);
@@ -113,11 +111,6 @@ public class ExampleParquetWriter extends 
ParquetWriter<Group> {
       return this;
     }
 
-    public Builder withExtraMetaData(Map<String, String> extraMetaData) {
-      this.extraMetaData = extraMetaData;
-      return this;
-    }
-
     @Override
     protected Builder self() {
       return this;
@@ -130,7 +123,12 @@ public class ExampleParquetWriter extends 
ParquetWriter<Group> {
 
     @Override
     protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
-      return new GroupWriteSupport(type, extraMetaData);
+      return new GroupWriteSupport(type);
+    }
+
+    @Override
+    public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+      return super.withExtraMetaData(extraMetaData);
     }
   }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index b5dedf665..fa9ee865d 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -36,6 +36,7 @@ import static 
org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -407,6 +408,64 @@ public class TestParquetWriter {
     testParquetFileNumberOfBlocks(1, 1, 3);
   }
 
+  @Test
+  public void testExtraMetaData() throws Exception {
+    final Configuration conf = new Configuration();
+    final File testDir = temp.newFile();
+    testDir.delete();
+
+    final MessageType schema = parseMessageType("message test { required int32 
int32_field; }");
+    GroupWriteSupport.setSchema(schema, conf);
+    final SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+    for (WriterVersion version : WriterVersion.values()) {
+      final Path filePath = new Path(testDir.getAbsolutePath(), 
version.name());
+      final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new 
TestOutputFile(filePath, conf))
+          .withConf(conf)
+          .withExtraMetaData(ImmutableMap.of("simple-key", "some-value-1", 
"nested.key", "some-value-2"))
+          .build();
+      for (int i = 0; i < 1000; i++) {
+        writer.write(f.newGroup().append("int32_field", 32));
+      }
+      writer.close();
+
+      final ParquetFileReader reader =
+          ParquetFileReader.open(HadoopInputFile.fromPath(filePath, new 
Configuration()));
+      assertEquals(1000, reader.readNextRowGroup().getRowCount());
+      assertEquals(
+          ImmutableMap.of(
+              "simple-key",
+              "some-value-1",
+              "nested.key",
+              "some-value-2",
+              ParquetWriter.OBJECT_MODEL_NAME_PROP,
+              "example"),
+          reader.getFileMetaData().getKeyValueMetaData());
+
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
+    final Configuration conf = new Configuration();
+    final File testDir = temp.newFile();
+    testDir.delete();
+
+    final MessageType schema = parseMessageType("message test { required int32 
int32_field; }");
+    GroupWriteSupport.setSchema(schema, conf);
+
+    for (WriterVersion version : WriterVersion.values()) {
+      final Path filePath = new Path(testDir.getAbsolutePath(), 
version.name());
+
+      Assert.assertThrows(IllegalArgumentException.class, () -> 
ExampleParquetWriter.builder(
+              new TestOutputFile(filePath, conf))
+          .withConf(conf)
+          
.withExtraMetaData(ImmutableMap.of(ParquetWriter.OBJECT_MODEL_NAME_PROP, 
"some-value-3"))
+          .build());
+    }
+  }
+
   private void testParquetFileNumberOfBlocks(
       int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int 
expectedNumberOfBlocks)
       throws IOException {

Reply via email to