This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d6358a9d602d4e62caf81a08b9f644f8e606088b Author: Y Ethan Guo <[email protected]> AuthorDate: Tue Aug 15 09:38:59 2023 -0700 [HUDI-6694] Fix log file CLI around command blocks (#9445) This commit fixes the log file CLI commands when the log file contains command blocks like rollback commands. The commit also adds the "File Path" column to the output for show logfile metadata CLI so it's easier to see the corresponding file path. --- .../hudi/cli/commands/HoodieLogFileCommand.java | 70 +++++++++++++++------- .../cli/commands/TestHoodieLogFileCommand.java | 33 +++++++--- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index cf36a704c7d..9a510bd466a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -51,6 +51,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -91,15 +92,27 @@ public class HoodieLogFileCommand { FileSystem fs = HoodieCLI.getTableMetaClient().getFs(); List<String> logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).stream() .map(status -> status.getPath().toString()).collect(Collectors.toList()); - Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata = + Map<String, List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>, + Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata = new HashMap<>(); int numCorruptBlocks = 0; int dummyInstantTimeCount = 0; + String basePath = HoodieCLI.getTableMetaClient().getBasePathV2().toString(); for (String logFilePath : logFilePaths) { - FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); - Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath)))); + Path path = new Path(logFilePath); + String pathString = path.toString(); + String fileName; + if (pathString.contains(basePath)) { + String[] split = pathString.split(basePath); + fileName = split[split.length - 1]; + } else { + fileName = path.getName(); + } + FileStatus[] fsStatus = fs.listStatus(path); + MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, path); + Schema writerSchema = schema != null + ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null; Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); // read the avro blocks @@ -133,12 +146,15 @@ public class HoodieLogFileCommand { } if (commitCountAndMetadata.containsKey(instantTime)) { commitCountAndMetadata.get(instantTime).add( - new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); + new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()), + new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); } else { - List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list = + List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>, + Map<HeaderMetadataType, String>>, Integer>> list = new ArrayList<>(); list.add( - new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); + new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()), + new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); commitCountAndMetadata.put(instantTime, list); } } @@ -146,22 +162,27 @@ public class HoodieLogFileCommand { } List<Comparable[]> rows = new ArrayList<>(); ObjectMapper objectMapper = new ObjectMapper(); - for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata + for (Map.Entry<String, List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>, + Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata .entrySet()) { String instantTime = entry.getKey(); - for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer> tuple3 : entry + for (Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>, + Map<HeaderMetadataType, String>>, Integer> tuple3 : entry .getValue()) { - Comparable[] output = new Comparable[5]; - output[0] = instantTime; - output[1] = tuple3._3(); - output[2] = tuple3._1().toString(); - output[3] = objectMapper.writeValueAsString(tuple3._2()._1()); - output[4] = objectMapper.writeValueAsString(tuple3._2()._2()); + Comparable[] output = new Comparable[6]; + output[0] = tuple3._1()._1(); + output[1] = instantTime; + output[2] = tuple3._3(); + output[3] = tuple3._1()._2().toString(); + output[4] = objectMapper.writeValueAsString(tuple3._2()._1()); + output[5] = objectMapper.writeValueAsString(tuple3._2()._2()); rows.add(output); } } - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME) + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME) .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT) .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE) .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA) @@ -193,10 +214,16 @@ public class HoodieLogFileCommand { // TODO : readerSchema can change across blocks/log files, fix this inside Scanner AvroSchemaConverter converter = new AvroSchemaConverter(); + Schema readerSchema = null; // get schema from last log file - Schema readerSchema = - converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))))); - + for (int i = logFilePaths.size() - 1; i >= 0; i--) { + MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(i))); + if (schema != null) { + readerSchema = converter.convert(schema); + break; + } + } + Objects.requireNonNull(readerSchema); List<IndexedRecord> allRecords = new ArrayList<>(); if (shouldMerge) { @@ -232,8 +259,9 @@ public class HoodieLogFileCommand { } } else { for (String logFile : logFilePaths) { - Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new CachingPath(logFile)))); + MessageType schema = TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new CachingPath(logFile)); + Schema writerSchema = schema != null + ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null; HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(new CachingPath(logFile)), writerSchema); // read the avro blocks diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index 25298876c42..7a423452a87 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.SchemaTestUtil; @@ -69,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -86,6 +88,7 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { private String partitionPath; private HoodieAvroDataBlock dataBlock; + private HoodieCommandBlock commandBlock; private String tablePath; private FileSystem fs; @@ -98,7 +101,7 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { // Create table and connect String tableName = tableName(); tablePath = tablePath(tableName); - partitionPath = Paths.get(tablePath, HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString(); + partitionPath = Paths.get(tablePath, DEFAULT_FIRST_PARTITION_PATH).toString(); new TableCommand().createTable( tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); @@ -109,7 +112,8 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() .onParentPath(new Path(partitionPath)) .withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) { + .withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs) + .withSizeThreshold(1).build()) { // write data to file List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()); @@ -118,6 +122,14 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); dataBlock = new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); writer.appendBlock(dataBlock); + + Map<HoodieLogBlock.HeaderMetadataType, String> rollbackHeader = new HashMap<>(); + rollbackHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103"); + rollbackHeader.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102"); + rollbackHeader.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); + commandBlock = new HoodieCommandBlock(rollbackHeader); + writer.appendBlock(commandBlock); } } @@ -134,7 +146,9 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { Object result = shell.evaluate(() -> "show logfile metadata --logFilePathPattern " + partitionPath + "/*"); assertTrue(ShellEvaluationResultUtil.isSuccess(result)); - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME) + TableHeader header = new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME) .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT) .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE) .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA) @@ -143,10 +157,15 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { // construct expect result, there is only 1 line. List<Comparable[]> rows = new ArrayList<>(); ObjectMapper objectMapper = new ObjectMapper(); - String headerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockHeader()); - String footerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockFooter()); - Comparable[] output = new Comparable[] {INSTANT_TIME, 100, dataBlock.getBlockType(), headerStr, footerStr}; - rows.add(output); + String logFileNamePrefix = DEFAULT_FIRST_PARTITION_PATH + "/test-log-fileid1_" + INSTANT_TIME + ".log"; + rows.add(new Comparable[] { + logFileNamePrefix + ".1_1-0-1", INSTANT_TIME, 100, dataBlock.getBlockType(), + objectMapper.writeValueAsString(dataBlock.getLogBlockHeader()), + objectMapper.writeValueAsString(dataBlock.getLogBlockFooter())}); + rows.add(new Comparable[] { + logFileNamePrefix + ".2_1-0-1", "103", 0, commandBlock.getBlockType(), + objectMapper.writeValueAsString(commandBlock.getLogBlockHeader()), + objectMapper.writeValueAsString(commandBlock.getLogBlockFooter())}); String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); expected = removeNonWordAndStripSpace(expected);
