This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d6358a9d602d4e62caf81a08b9f644f8e606088b
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Aug 15 09:38:59 2023 -0700

    [HUDI-6694] Fix log file CLI around command blocks (#9445)
    
    This commit fixes the log file CLI commands when the log file contains 
command blocks like rollback commands. The commit also adds the "File Path" 
column to the output for show logfile metadata CLI so it's easier to see the 
corresponding file path.
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 70 +++++++++++++++-------
 .../cli/commands/TestHoodieLogFileCommand.java     | 33 +++++++---
 2 files changed, 75 insertions(+), 28 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index cf36a704c7d..9a510bd466a 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
 import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
 import org.springframework.shell.standard.ShellOption;
@@ -91,15 +92,27 @@ public class HoodieLogFileCommand {
     FileSystem fs = HoodieCLI.getTableMetaClient().getFs();
     List<String> logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, 
new Path(logFilePathPattern)).stream()
         .map(status -> 
status.getPath().toString()).collect(Collectors.toList());
-    Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, 
String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
+    Map<String, List<Tuple3<Tuple2<String, HoodieLogBlockType>, 
Tuple2<Map<HeaderMetadataType, String>,
+        Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
         new HashMap<>();
     int numCorruptBlocks = 0;
     int dummyInstantTimeCount = 0;
+    String basePath = 
HoodieCLI.getTableMetaClient().getBasePathV2().toString();
 
     for (String logFilePath : logFilePaths) {
-      FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
-      Schema writerSchema = new AvroSchemaConverter()
-          
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, 
new Path(logFilePath))));
+      Path path = new Path(logFilePath);
+      String pathString = path.toString();
+      String fileName;
+      if (pathString.contains(basePath)) {
+        String[] split = pathString.split(basePath);
+        fileName = split[split.length - 1];
+      } else {
+        fileName = path.getName();
+      }
+      FileStatus[] fsStatus = fs.listStatus(path);
+      MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, path);
+      Schema writerSchema = schema != null
+          ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) 
: null;
       Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(fsStatus[0].getPath()), writerSchema);
 
       // read the avro blocks
@@ -133,12 +146,15 @@ public class HoodieLogFileCommand {
         }
         if (commitCountAndMetadata.containsKey(instantTime)) {
           commitCountAndMetadata.get(instantTime).add(
-              new Tuple3<>(n.getBlockType(), new 
Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
+              new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
+                  new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), 
recordCount.get()));
         } else {
-          List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, 
String>, Map<HeaderMetadataType, String>>, Integer>> list =
+          List<Tuple3<Tuple2<String, HoodieLogBlockType>, 
Tuple2<Map<HeaderMetadataType, String>,
+              Map<HeaderMetadataType, String>>, Integer>> list =
               new ArrayList<>();
           list.add(
-              new Tuple3<>(n.getBlockType(), new 
Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
+              new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
+                  new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), 
recordCount.get()));
           commitCountAndMetadata.put(instantTime, list);
         }
       }
@@ -146,22 +162,27 @@ public class HoodieLogFileCommand {
     }
     List<Comparable[]> rows = new ArrayList<>();
     ObjectMapper objectMapper = new ObjectMapper();
-    for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, 
Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, 
Integer>>> entry : commitCountAndMetadata
+    for (Map.Entry<String, List<Tuple3<Tuple2<String, HoodieLogBlockType>, 
Tuple2<Map<HeaderMetadataType, String>,
+        Map<HeaderMetadataType, String>>, Integer>>> entry : 
commitCountAndMetadata
         .entrySet()) {
       String instantTime = entry.getKey();
-      for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, 
Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
+      for (Tuple3<Tuple2<String, HoodieLogBlockType>, 
Tuple2<Map<HeaderMetadataType, String>,
+          Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
           .getValue()) {
-        Comparable[] output = new Comparable[5];
-        output[0] = instantTime;
-        output[1] = tuple3._3();
-        output[2] = tuple3._1().toString();
-        output[3] = objectMapper.writeValueAsString(tuple3._2()._1());
-        output[4] = objectMapper.writeValueAsString(tuple3._2()._2());
+        Comparable[] output = new Comparable[6];
+        output[0] = tuple3._1()._1();
+        output[1] = instantTime;
+        output[2] = tuple3._3();
+        output[3] = tuple3._1()._2().toString();
+        output[4] = objectMapper.writeValueAsString(tuple3._2()._1());
+        output[5] = objectMapper.writeValueAsString(tuple3._2()._2());
         rows.add(output);
       }
     }
 
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
+    TableHeader header = new TableHeader()
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_PATH)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA)
@@ -193,10 +214,16 @@ public class HoodieLogFileCommand {
 
     // TODO : readerSchema can change across blocks/log files, fix this inside 
Scanner
     AvroSchemaConverter converter = new AvroSchemaConverter();
+    Schema readerSchema = null;
     // get schema from last log file
-    Schema readerSchema =
-        
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs,
 new Path(logFilePaths.get(logFilePaths.size() - 1)))));
-
+    for (int i = logFilePaths.size() - 1; i >= 0; i--) {
+      MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, new 
Path(logFilePaths.get(i)));
+      if (schema != null) {
+        readerSchema = converter.convert(schema);
+        break;
+      }
+    }
+    Objects.requireNonNull(readerSchema);
     List<IndexedRecord> allRecords = new ArrayList<>();
 
     if (shouldMerge) {
@@ -232,8 +259,9 @@ public class HoodieLogFileCommand {
       }
     } else {
       for (String logFile : logFilePaths) {
-        Schema writerSchema = new AvroSchemaConverter()
-            
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
 new CachingPath(logFile))));
+        MessageType schema = 
TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new 
CachingPath(logFile));
+        Schema writerSchema = schema != null
+            ? new 
AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null;
         HoodieLogFormat.Reader reader =
             HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
CachingPath(logFile)), writerSchema);
         // read the avro blocks
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 25298876c42..7a423452a87 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -69,6 +70,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
 import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -86,6 +88,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
 
   private String partitionPath;
   private HoodieAvroDataBlock dataBlock;
+  private HoodieCommandBlock commandBlock;
   private String tablePath;
   private FileSystem fs;
 
@@ -98,7 +101,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
     // Create table and connect
     String tableName = tableName();
     tablePath = tablePath(tableName);
-    partitionPath = Paths.get(tablePath, 
HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
+    partitionPath = Paths.get(tablePath, 
DEFAULT_FIRST_PARTITION_PATH).toString();
     new TableCommand().createTable(
         tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
         "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
@@ -109,7 +112,8 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
     try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
         .onParentPath(new Path(partitionPath))
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-        
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) {
+        .withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs)
+        .withSizeThreshold(1).build()) {
 
       // write data to file
       List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0, 
100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
@@ -118,6 +122,14 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
       dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       writer.appendBlock(dataBlock);
+
+      Map<HoodieLogBlock.HeaderMetadataType, String> rollbackHeader = new 
HashMap<>();
+      rollbackHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
"103");
+      
rollbackHeader.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, 
"102");
+      rollbackHeader.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+          
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
+      commandBlock = new HoodieCommandBlock(rollbackHeader);
+      writer.appendBlock(commandBlock);
     }
   }
 
@@ -134,7 +146,9 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
     Object result = shell.evaluate(() -> "show logfile metadata 
--logFilePathPattern " + partitionPath + "/*");
     assertTrue(ShellEvaluationResultUtil.isSuccess(result));
 
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
+    TableHeader header = new TableHeader()
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_PATH)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA)
@@ -143,10 +157,15 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
     // construct expect result, there is only 1 line.
     List<Comparable[]> rows = new ArrayList<>();
     ObjectMapper objectMapper = new ObjectMapper();
-    String headerStr = 
objectMapper.writeValueAsString(dataBlock.getLogBlockHeader());
-    String footerStr = 
objectMapper.writeValueAsString(dataBlock.getLogBlockFooter());
-    Comparable[] output = new Comparable[] {INSTANT_TIME, 100, 
dataBlock.getBlockType(), headerStr, footerStr};
-    rows.add(output);
+    String logFileNamePrefix = DEFAULT_FIRST_PARTITION_PATH + 
"/test-log-fileid1_" + INSTANT_TIME + ".log";
+    rows.add(new Comparable[] {
+        logFileNamePrefix + ".1_1-0-1", INSTANT_TIME, 100, 
dataBlock.getBlockType(),
+        objectMapper.writeValueAsString(dataBlock.getLogBlockHeader()),
+        objectMapper.writeValueAsString(dataBlock.getLogBlockFooter())});
+    rows.add(new Comparable[] {
+        logFileNamePrefix + ".2_1-0-1", "103", 0, commandBlock.getBlockType(),
+        objectMapper.writeValueAsString(commandBlock.getLogBlockHeader()),
+        objectMapper.writeValueAsString(commandBlock.getLogBlockFooter())});
 
     String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", 
false, -1, false, rows);
     expected = removeNonWordAndStripSpace(expected);

Reply via email to