vinothchandar commented on a change in pull request #5179:
URL: https://github.com/apache/hudi/pull/5179#discussion_r838999843
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
##########
@@ -117,30 +133,119 @@ public void trySave(int taskPartitionId) {
}
}
+ private String getMetafileExtension() {
+ // To be backwards compatible, there is no extension to the properties
file base partition metafile
+ return format.isPresent() ? format.get().getFileExtension() : "";
+ }
+
+ /**
+ * Write the partition metadata in the correct format in the given file path.
+ *
+ * @param filePath Path of the file to write
+ * @throws IOException
+ */
+ private void writeMetafile(Path filePath) throws IOException {
+ if (format.isPresent()) {
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+ switch (format.get()) {
+ case PARQUET:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ MessageType type =
Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(type, schema, Option.empty());
+ try (ParquetWriter writer = new ParquetWriter(filePath,
writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
Review comment:
Could we use the Hudi writer classes for the files and skip the metadata
fields.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
##########
@@ -117,30 +133,119 @@ public void trySave(int taskPartitionId) {
}
}
+ private String getMetafileExtension() {
+ // To be backwards compatible, there is no extension to the properties
file base partition metafile
+ return format.isPresent() ? format.get().getFileExtension() : "";
+ }
+
+ /**
+ * Write the partition metadata in the correct format in the given file path.
+ *
+ * @param filePath Path of the file to write
+ * @throws IOException
+ */
+ private void writeMetafile(Path filePath) throws IOException {
+ if (format.isPresent()) {
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+ switch (format.get()) {
+ case PARQUET:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ MessageType type =
Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(type, schema, Option.empty());
Review comment:
would it cause issues if this schema is not same as the table schema? cc
@vingov
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
##########
@@ -117,30 +133,119 @@ public void trySave(int taskPartitionId) {
}
}
+ private String getMetafileExtension() {
+ // To be backwards compatible, there is no extension to the properties
file base partition metafile
+ return format.isPresent() ? format.get().getFileExtension() : "";
+ }
+
+ /**
+ * Write the partition metadata in the correct format in the given file path.
+ *
+ * @param filePath Path of the file to write
+ * @throws IOException
+ */
+ private void writeMetafile(Path filePath) throws IOException {
+ if (format.isPresent()) {
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+ switch (format.get()) {
+ case PARQUET:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ MessageType type =
Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(type, schema, Option.empty());
+ try (ParquetWriter writer = new ParquetWriter(filePath,
writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+ for (String key : props.stringPropertyNames()) {
+ writeSupport.addFooterMetadata(key, props.getProperty(key));
+ }
+ }
+ break;
+ case ORC:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ OrcFile.WriterOptions writerOptions =
OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+ .setSchema(AvroOrcUtils.createOrcSchema(schema));
+ try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+ for (String key : props.stringPropertyNames()) {
+ writer.addUserMetadata(key,
ByteBuffer.wrap(props.getProperty(key).getBytes()));
+ }
+ }
+ break;
+ default:
+ throw new HoodieException("Unsupported format for partition
metafiles: " + format.get());
+ }
+ } else {
+ // Backwards compatible properties file format
+ FSDataOutputStream os = fs.create(filePath, true);
+ props.store(os, "partition metadata");
+ os.hsync();
+ os.hflush();
+ os.close();
+ }
+ }
+
/**
* Read out the metadata for this partition.
*/
public void readFromFS() throws IOException {
- FSDataInputStream is = null;
+ Option<Path> metafilePath = getPartitionMetafilePath(fs, partitionPath);
+ if (!metafilePath.isPresent()) {
+ throw new HoodieException("Partition metafile not found in path " +
partitionPath);
+ }
+
try {
- Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
- is = fs.open(metaFile);
- props.load(is);
- } catch (IOException ioe) {
- throw new HoodieException("Error reading Hoodie partition metadata for "
+ partitionPath, ioe);
- } finally {
- if (is != null) {
- is.close();
+ BaseFileUtils reader =
BaseFileUtils.getInstance(metafilePath.toString());
+ Map<String, String> metadata = reader.readFooter(fs.getConf(), true,
metafilePath.get(), PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
+ props.clear();
+ metadata.forEach((k, v) -> props.put(k, v));
+ } catch (UnsupportedOperationException e) {
+ // Properties file format
+ FSDataInputStream is = null;
+ try {
+ is = fs.open(metafilePath.get());
+ props.load(is);
+ } catch (IOException ioe) {
+ throw new HoodieException("Error reading Hoodie partition metadata
from " + metafilePath, ioe);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
}
}
}
// methods related to partition meta data
public static boolean hasPartitionMetadata(FileSystem fs, Path
partitionPath) {
+ return getPartitionMetafilePath(fs, partitionPath).isPresent();
+ }
+
+ /**
+ * Returns the name of the partition metadata.
+ *
+ * @param fs
+ * @param partitionPath
+ * @return Name of the partition metafile or empty option
+ */
+ public static Option<Path> getPartitionMetafilePath(FileSystem fs, Path
partitionPath) {
+ // The partition listing is a costly operation so instead we are searching
for existence of the files instead.
+ // This is in expected order as properties file based partition metafiles
should be the most common.
Review comment:
can't we read the table config and determine what to do? e.g
HoodieTableConfig.PARTITION_METAFILE_USE_DATA_FORMAT=true, then check for base
file format, else read a txt properties file
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
##########
@@ -498,4 +502,46 @@ public void testBulkInsertRecords(String bulkInsertMode)
throws Exception {
public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode)
throws Exception {
testBulkInsertRecords(bulkInsertMode);
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testPartitionMetafileFormat(boolean
partitionMetafileUseDataFormat) throws Exception {
+ // By default there is no format specified for partition metafile
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).build();
+ HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)
HoodieSparkTable.create(config, context, metaClient);
+ assertFalse(table.getPartitionMetafileFormat().isPresent());
+
+ if (partitionMetafileUseDataFormat) {
+ // Add the setting to use datafile format
+ Properties properties = new Properties();
+
properties.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_DATA_FORMAT.key(),
"true");
+ initMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertTrue(metaClient.getTableConfig().getPartitionMetafileFormat().isPresent());
+ table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config,
context, metaClient);
+ assertTrue(table.getPartitionMetafileFormat().isPresent());
+ }
+
+ String instantTime = makeNewCommitTime();
+ SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+ writeClient.startCommitWithTime(instantTime);
+
+ // Insert new records
+ final JavaRDD<HoodieRecord> inputRecords =
generateTestRecordsForBulkInsert(jsc, 10);
+ writeClient.bulkInsert(inputRecords, instantTime);
+
+ // Partition metafile should be created
+ Path partitionPath = new Path(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs,
partitionPath));
+ Option<Path> metafilePath =
HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath);
+ if (partitionMetafileUseDataFormat) {
+ // Extension should be the same as the data file format of the table
+
assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension()));
Review comment:
also validate the contents/data stored in the meta file?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
##########
@@ -117,30 +133,119 @@ public void trySave(int taskPartitionId) {
}
}
+ private String getMetafileExtension() {
+ // To be backwards compatible, there is no extension to the properties
file base partition metafile
+ return format.isPresent() ? format.get().getFileExtension() : "";
+ }
+
+ /**
+ * Write the partition metadata in the correct format in the given file path.
+ *
+ * @param filePath Path of the file to write
+ * @throws IOException
+ */
+ private void writeMetafile(Path filePath) throws IOException {
+ if (format.isPresent()) {
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+ switch (format.get()) {
+ case PARQUET:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ MessageType type =
Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(type, schema, Option.empty());
+ try (ParquetWriter writer = new ParquetWriter(filePath,
writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+ for (String key : props.stringPropertyNames()) {
+ writeSupport.addFooterMetadata(key, props.getProperty(key));
+ }
+ }
+ break;
+ case ORC:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ OrcFile.WriterOptions writerOptions =
OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+ .setSchema(AvroOrcUtils.createOrcSchema(schema));
+ try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+ for (String key : props.stringPropertyNames()) {
+ writer.addUserMetadata(key,
ByteBuffer.wrap(props.getProperty(key).getBytes()));
+ }
+ }
+ break;
+ default:
Review comment:
what about HFile
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
##########
@@ -117,30 +133,119 @@ public void trySave(int taskPartitionId) {
}
}
+ private String getMetafileExtension() {
+ // To be backwards compatible, there is no extension to the properties
file base partition metafile
+ return format.isPresent() ? format.get().getFileExtension() : "";
+ }
+
+ /**
+ * Write the partition metadata in the correct format in the given file path.
+ *
+ * @param filePath Path of the file to write
+ * @throws IOException
+ */
+ private void writeMetafile(Path filePath) throws IOException {
+ if (format.isPresent()) {
+ Schema schema = HoodieAvroUtils.getRecordKeySchema();
+
+ switch (format.get()) {
+ case PARQUET:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ MessageType type =
Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(type, schema, Option.empty());
+ try (ParquetWriter writer = new ParquetWriter(filePath,
writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
+ for (String key : props.stringPropertyNames()) {
+ writeSupport.addFooterMetadata(key, props.getProperty(key));
+ }
+ }
+ break;
+ case ORC:
+ // Since we are only interested in saving metadata to the footer,
the schema, blocksizes and other
+ // parameters are not important.
+ OrcFile.WriterOptions writerOptions =
OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
+ .setSchema(AvroOrcUtils.createOrcSchema(schema));
+ try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
+ for (String key : props.stringPropertyNames()) {
+ writer.addUserMetadata(key,
ByteBuffer.wrap(props.getProperty(key).getBytes()));
+ }
+ }
+ break;
+ default:
+ throw new HoodieException("Unsupported format for partition
metafiles: " + format.get());
+ }
+ } else {
+ // Backwards compatible properties file format
+ FSDataOutputStream os = fs.create(filePath, true);
+ props.store(os, "partition metadata");
+ os.hsync();
+ os.hflush();
+ os.close();
+ }
+ }
+
/**
* Read out the metadata for this partition.
*/
public void readFromFS() throws IOException {
- FSDataInputStream is = null;
+ Option<Path> metafilePath = getPartitionMetafilePath(fs, partitionPath);
+ if (!metafilePath.isPresent()) {
+ throw new HoodieException("Partition metafile not found in path " +
partitionPath);
+ }
+
try {
- Path metaFile = new Path(partitionPath, HOODIE_PARTITION_METAFILE);
- is = fs.open(metaFile);
- props.load(is);
- } catch (IOException ioe) {
- throw new HoodieException("Error reading Hoodie partition metadata for "
+ partitionPath, ioe);
- } finally {
- if (is != null) {
- is.close();
+ BaseFileUtils reader =
BaseFileUtils.getInstance(metafilePath.toString());
+ Map<String, String> metadata = reader.readFooter(fs.getConf(), true,
metafilePath.get(), PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
+ props.clear();
+ metadata.forEach((k, v) -> props.put(k, v));
+ } catch (UnsupportedOperationException e) {
+ // Properties file format
+ FSDataInputStream is = null;
+ try {
+ is = fs.open(metafilePath.get());
+ props.load(is);
+ } catch (IOException ioe) {
+ throw new HoodieException("Error reading Hoodie partition metadata
from " + metafilePath, ioe);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
}
}
}
// methods related to partition meta data
public static boolean hasPartitionMetadata(FileSystem fs, Path
partitionPath) {
+ return getPartitionMetafilePath(fs, partitionPath).isPresent();
+ }
+
+ /**
+ * Returns the name of the partition metadata.
+ *
+ * @param fs
+ * @param partitionPath
+ * @return Name of the partition metafile or empty option
+ */
+ public static Option<Path> getPartitionMetafilePath(FileSystem fs, Path
partitionPath) {
+ // The partition listing is a costly operation so instead we are searching
for existence of the files instead.
+ // This is in expected order as properties file based partition metafiles
should be the most common.
try {
- return fs.exists(new Path(partitionPath,
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+ Path metafilePath = new Path(partitionPath,
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX);
Review comment:
I think we can add this as a step to upgrade and downgrade? then we
don't have to check anything. Just assume that its same as base file format?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]