This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b0a19cff17 [HUDI-7342] Use BaseFileUtils to hide format-specific
logic in HoodiePartitionMetadata (#10568)
8b0a19cff17 is described below
commit 8b0a19cff17bd1f4885c4cb814fab3a30df66a05
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Jan 28 23:42:07 2024 -0800
[HUDI-7342] Use BaseFileUtils to hide format-specific logic in
HoodiePartitionMetadata (#10568)
---
.../hudi/common/model/HoodiePartitionMetadata.java | 43 +---------------------
.../org/apache/hudi/common/util/BaseFileUtils.java | 15 ++++++++
.../java/org/apache/hudi/common/util/OrcUtils.java | 18 +++++++++
.../org/apache/hudi/common/util/ParquetUtils.java | 23 ++++++++++++
4 files changed, 57 insertions(+), 42 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index ad5912ba8b9..2b63433bef4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -18,40 +18,26 @@
package org.apache.hudi.common.model;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
-import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Writer;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-
/**
* The metadata that goes into the meta file in each partition.
*/
@@ -152,34 +138,7 @@ public class HoodiePartitionMetadata {
*/
private void writeMetafile(Path filePath) throws IOException {
if (format.isPresent()) {
- Schema schema = HoodieAvroUtils.getRecordKeySchema();
-
- switch (format.get()) {
- case PARQUET:
- // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
- // parameters are not important.
- MessageType type =
Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
- HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(type, schema, Option.empty(), new Properties());
- try (ParquetWriter writer = new ParquetWriter(filePath,
writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
- for (String key : props.stringPropertyNames()) {
- writeSupport.addFooterMetadata(key, props.getProperty(key));
- }
- }
- break;
- case ORC:
- // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
- // parameters are not important.
- OrcFile.WriterOptions writerOptions =
OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
- .setSchema(AvroOrcUtils.createOrcSchema(schema));
- try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
- for (String key : props.stringPropertyNames()) {
- writer.addUserMetadata(key,
ByteBuffer.wrap(getUTF8Bytes(props.getProperty(key))));
- }
- }
- break;
- default:
- throw new HoodieException("Unsupported format for partition
metafiles: " + format.get());
- }
+ BaseFileUtils.getInstance(format.get()).writeMetaFile(fs, filePath,
props);
} else {
// Backwards compatible properties file format
FSDataOutputStream os = fs.create(filePath, true);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index 278729f3d78..488936c00f2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -33,11 +33,14 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -218,4 +221,16 @@ public abstract class BaseFileUtils {
* @return The subclass's {@link HoodieFileFormat}.
*/
public abstract HoodieFileFormat getFormat();
+
+ /**
+ * Writes properties to the meta file.
+ *
+ * @param fs {@link FileSystem} instance.
+ * @param filePath file path to write to.
+ * @param props properties to write.
+ * @throws IOException upon write error.
+ */
+ public abstract void writeMetaFile(FileSystem fs,
+ Path filePath,
+ Properties props) throws IOException;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index 63e24455c88..f43fad0e3f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -42,6 +43,7 @@ import org.apache.orc.Reader;
import org.apache.orc.Reader.Options;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -51,10 +53,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.BinaryUtil.toBytes;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Utility functions for ORC files.
@@ -272,4 +276,18 @@ public class OrcUtils extends BaseFileUtils {
throw new HoodieIOException("Unable to get row count for ORC file:" +
orcFilePath, io);
}
}
+
+ @Override
+ public void writeMetaFile(FileSystem fs, Path filePath, Properties props)
throws IOException {
+ // Since we are only interested in saving metadata to the footer, the
schema, blocksizes and other
+ // parameters are not important.
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+ OrcFile.WriterOptions writerOptions =
OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+ .setSchema(AvroOrcUtils.createOrcSchema(schema));
+ try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+ for (String key : props.stringPropertyNames()) {
+ writer.addUserMetadata(key,
ByteBuffer.wrap(getUTF8Bytes(props.getProperty(key))));
+ }
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index 98d4e4889f1..7b0f7f4736c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
@@ -33,6 +34,7 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
@@ -40,13 +42,16 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +65,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collector;
@@ -287,6 +293,23 @@ public class ParquetUtils extends BaseFileUtils {
return rowCount;
}
+ @Override
+ public void writeMetaFile(FileSystem fs, Path filePath, Properties props)
throws IOException {
+ // Since we are only interested in saving metadata to the footer, the
schema, blocksizes and other
+ // parameters are not important.
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+ MessageType type = Types.buildMessage()
+
.optional(PrimitiveType.PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+ HoodieAvroWriteSupport writeSupport =
+ new HoodieAvroWriteSupport(type, schema, Option.empty(), new
Properties());
+ try (ParquetWriter writer = new ParquetWriter(
+ filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024,
1024)) {
+ for (String key : props.stringPropertyNames()) {
+ writeSupport.addFooterMetadata(key, props.getProperty(key));
+ }
+ }
+ }
+
static class RecordKeysFilterFunction implements Function<String, Boolean> {
private final Set<String> candidateKeys;