This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 19f284355 PARQUET-2413: Support configurable extraMetadata in
ParquetWriter (#1241)
19f284355 is described below
commit 19f284355847696fa254c789ab93c42db9af5982
Author: Claire McGinty <[email protected]>
AuthorDate: Sun Jan 28 02:27:10 2024 -0500
PARQUET-2413: Support configurable extraMetadata in ParquetWriter (#1241)
---
.../apache/parquet/column/ParquetProperties.java | 15 ++++++
.../org/apache/parquet/hadoop/ParquetWriter.java | 43 +++++++++++++---
.../hadoop/example/ExampleParquetWriter.java | 14 +++--
.../apache/parquet/hadoop/TestParquetWriter.java | 59 ++++++++++++++++++++++
4 files changed, 115 insertions(+), 16 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 7bf4009ee..5152d5b07 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -20,6 +20,8 @@ package org.apache.parquet.column;
import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.OptionalLong;
@@ -113,6 +115,7 @@ public class ParquetProperties {
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
+ private final Map<String, String> extraMetaData;
private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
@@ -139,6 +142,7 @@ public class ParquetProperties {
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.enableByteStreamSplit = builder.enableByteStreamSplit;
+ this.extraMetaData = builder.extraMetaData;
}
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -293,6 +297,10 @@ public class ParquetProperties {
return numBloomFilterCandidates.getValue(column);
}
+ public Map<String, String> getExtraMetaData() {
+ return extraMetaData;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -342,6 +350,7 @@ public class ParquetProperties {
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled =
DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit =
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
+ private Map<String, String> extraMetaData = new HashMap<>();
private Builder() {
enableDict =
ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
@@ -373,6 +382,7 @@ public class ParquetProperties {
this.numBloomFilterCandidates =
ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
+ this.extraMetaData = toCopy.extraMetaData;
}
/**
@@ -584,6 +594,11 @@ public class ParquetProperties {
return this;
}
+ public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+ this.extraMetaData = extraMetaData;
+ return this;
+ }
+
public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties
above as currently
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index fc9db5872..1838d1db4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -20,6 +20,8 @@ package org.apache.parquet.hadoop;
import java.io.Closeable;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
@@ -403,15 +405,29 @@ public class ParquetWriter<T> implements Closeable {
this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(compressionCodecName);
+
+ final Map<String, String> extraMetadata;
+ if (encodingProps.getExtraMetaData() == null
+ || encodingProps.getExtraMetaData().isEmpty()) {
+ extraMetadata = writeContext.getExtraMetaData();
+ } else {
+ extraMetadata = new HashMap<>(writeContext.getExtraMetaData());
+
+ encodingProps.getExtraMetaData().forEach((metadataKey, metadataValue) ->
{
+ if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
+ throw new IllegalArgumentException("Cannot overwrite metadata key "
+ OBJECT_MODEL_NAME_PROP
+ + ". Please use another key name.");
+ }
+
+ if (extraMetadata.put(metadataKey, metadataValue) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate metadata key " + metadataKey + ". Please use another
key name.");
+ }
+ });
+ }
+
this.writer = new InternalParquetRecordWriter<T>(
- fileWriter,
- writeSupport,
- schema,
- writeContext.getExtraMetaData(),
- rowGroupSize,
- compressor,
- validating,
- encodingProps);
+ fileWriter, writeSupport, schema, extraMetadata, rowGroupSize,
compressor, validating, encodingProps);
}
public void write(T object) throws IOException {
@@ -849,6 +865,17 @@ public class ParquetWriter<T> implements Closeable {
return self();
}
+ /**
+ * Sets additional metadata entries to be included in the file footer.
+ *
+ * @param extraMetaData a Map of additional stringly-typed metadata entries
+ * @return this builder for method chaining
+ */
+ public SELF withExtraMetaData(Map<String, String> extraMetaData) {
+ encodingPropsBuilder.withExtraMetaData(extraMetaData);
+ return self();
+ }
+
/**
* Set a property that will be available to the read path. For writers
that use a Hadoop
* configuration, this is the recommended way to add configuration values.
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
index 23df1faa3..e6b71a49d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java
@@ -19,7 +19,6 @@
package org.apache.parquet.hadoop.example;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -98,7 +97,6 @@ public class ExampleParquetWriter extends
ParquetWriter<Group> {
public static class Builder extends ParquetWriter.Builder<Group, Builder> {
private MessageType type = null;
- private Map<String, String> extraMetaData = new HashMap<String, String>();
private Builder(Path file) {
super(file);
@@ -113,11 +111,6 @@ public class ExampleParquetWriter extends
ParquetWriter<Group> {
return this;
}
- public Builder withExtraMetaData(Map<String, String> extraMetaData) {
- this.extraMetaData = extraMetaData;
- return this;
- }
-
@Override
protected Builder self() {
return this;
@@ -130,7 +123,12 @@ public class ExampleParquetWriter extends
ParquetWriter<Group> {
@Override
protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
- return new GroupWriteSupport(type, extraMetaData);
+ return new GroupWriteSupport(type);
+ }
+
+ @Override
+ public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+ return super.withExtraMetaData(extraMetaData);
}
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index b5dedf665..fa9ee865d 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -36,6 +36,7 @@ import static
org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
@@ -407,6 +408,64 @@ public class TestParquetWriter {
testParquetFileNumberOfBlocks(1, 1, 3);
}
+ @Test
+ public void testExtraMetaData() throws Exception {
+ final Configuration conf = new Configuration();
+ final File testDir = temp.newFile();
+ testDir.delete();
+
+ final MessageType schema = parseMessageType("message test { required int32
int32_field; }");
+ GroupWriteSupport.setSchema(schema, conf);
+ final SimpleGroupFactory f = new SimpleGroupFactory(schema);
+
+ for (WriterVersion version : WriterVersion.values()) {
+ final Path filePath = new Path(testDir.getAbsolutePath(),
version.name());
+ final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new
TestOutputFile(filePath, conf))
+ .withConf(conf)
+ .withExtraMetaData(ImmutableMap.of("simple-key", "some-value-1",
"nested.key", "some-value-2"))
+ .build();
+ for (int i = 0; i < 1000; i++) {
+ writer.write(f.newGroup().append("int32_field", 32));
+ }
+ writer.close();
+
+ final ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(filePath, new
Configuration()));
+ assertEquals(1000, reader.readNextRowGroup().getRowCount());
+ assertEquals(
+ ImmutableMap.of(
+ "simple-key",
+ "some-value-1",
+ "nested.key",
+ "some-value-2",
+ ParquetWriter.OBJECT_MODEL_NAME_PROP,
+ "example"),
+ reader.getFileMetaData().getKeyValueMetaData());
+
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
+ final Configuration conf = new Configuration();
+ final File testDir = temp.newFile();
+ testDir.delete();
+
+ final MessageType schema = parseMessageType("message test { required int32
int32_field; }");
+ GroupWriteSupport.setSchema(schema, conf);
+
+ for (WriterVersion version : WriterVersion.values()) {
+ final Path filePath = new Path(testDir.getAbsolutePath(),
version.name());
+
+ Assert.assertThrows(IllegalArgumentException.class, () ->
ExampleParquetWriter.builder(
+ new TestOutputFile(filePath, conf))
+ .withConf(conf)
+
.withExtraMetaData(ImmutableMap.of(ParquetWriter.OBJECT_MODEL_NAME_PROP,
"some-value-3"))
+ .build());
+ }
+ }
+
private void testParquetFileNumberOfBlocks(
int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int
expectedNumberOfBlocks)
throws IOException {