the-other-tim-brown commented on code in PR #9379:
URL: https://github.com/apache/hudi/pull/9379#discussion_r1286196942


##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java:
##########
@@ -225,6 +246,178 @@ protected HoodieJavaWriteClient 
getHoodieWriteClient(HoodieWriteConfig cfg) {
     return writeClient;
   }
 
+  public void syncTableMetadata(HoodieWriteConfig writeConfig) {
+    if (!writeConfig.getMetadataConfig().enabled()) {
+      return;
+    }
+    // Open up the metadata table again, for syncing
+    try (HoodieTableMetadataWriter writer = 
JavaHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context, 
Option.empty())) {
+      LOG.info("Successfully synced to metadata table");
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Error syncing to metadata table.", e);
+    }
+  }
+
+  protected HoodieTableMetadata metadata(HoodieWriteConfig clientConfig, 
HoodieEngineContext engineContext) {
+    return HoodieTableMetadata.create(engineContext, 
clientConfig.getMetadataConfig(), clientConfig.getBasePath());
+  }
+
+  /**
+   * Validate the metadata tables contents to ensure it matches what is on the 
file system.
+   */
+  public void validateMetadata(HoodieTestTable testTable, List<String> 
inflightCommits, HoodieWriteConfig writeConfig,
+                               String metadataTableBasePath, boolean 
doFullValidation) throws IOException {
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    assertNotNull(tableMetadata, "MetadataReader should have been 
initialized");
+    if (!writeConfig.isMetadataTableEnabled()) {
+      return;
+    }
+
+    if (tableMetadata instanceof FileSystemBackedTableMetadata || 
!tableMetadata.getSyncedInstantTime().isPresent()) {
+      throw new IllegalStateException("Metadata should have synced some 
commits or tableMetadata should not be an instance "
+          + "of FileSystemBackedTableMetadata");
+    }
+    assertEquals(inflightCommits, testTable.inflightCommits());
+
+    HoodieTimer timer = HoodieTimer.start();
+    HoodieJavaEngineContext engineContext = new 
HoodieJavaEngineContext(hadoopConf);
+
+    // Partitions should match
+    List<java.nio.file.Path> fsPartitionPaths = 
testTable.getAllPartitionPaths();
+    List<String> fsPartitions = new ArrayList<>();
+    fsPartitionPaths.forEach(entry -> 
fsPartitions.add(entry.getFileName().toString()));
+    if (fsPartitions.isEmpty() && testTable.isNonPartitioned()) {
+      fsPartitions.add("");
+    }
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+
+    Collections.sort(fsPartitions);
+    Collections.sort(metadataPartitions);
+
+    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions 
should match");
+    assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
+
+    // Files within each partition should match
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieJavaTable.create(writeConfig, engineContext);
+    TableFileSystemView tableView = table.getHoodieView();
+    List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> 
basePath + "/" + partition).collect(Collectors.toList());
+    Map<String, FileStatus[]> partitionToFilesMap = 
tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
+    assertEquals(fsPartitions.size(), partitionToFilesMap.size());
+
+    fsPartitions.forEach(partition -> {
+      try {
+        validateFilesPerPartition(testTable, tableMetadata, tableView, 
partitionToFilesMap, partition);
+      } catch (IOException e) {
+        fail("Exception should not be raised: " + e);
+      }
+    });
+    if (doFullValidation) {
+      runFullValidation(table.getConfig().getMetadataConfig(), writeConfig, 
metadataTableBasePath, engineContext);
+    }
+
+    LOG.info("Validation time=" + timer.endTimer());
+  }
+
+  protected void validateFilesPerPartition(HoodieTestTable testTable, 
HoodieTableMetadata tableMetadata, TableFileSystemView tableView,
+                                           Map<String, FileStatus[]> 
partitionToFilesMap, String partition) throws IOException {
+    Path partitionPath;
+    if (partition.equals("")) {
+      // Should be the non-partitioned case
+      partitionPath = new Path(basePath);
+    } else {
+      partitionPath = new Path(basePath, partition);
+    }
+
+    FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition);
+    FileStatus[] metaStatuses = 
tableMetadata.getAllFilesInPartition(partitionPath);
+    List<String> fsFileNames = Arrays.stream(fsStatuses)
+        .map(s -> s.getPath().getName()).collect(Collectors.toList());
+    List<String> metadataFilenames = Arrays.stream(metaStatuses)
+        .map(s -> s.getPath().getName()).collect(Collectors.toList());
+    Collections.sort(fsFileNames);
+    Collections.sort(metadataFilenames);
+
+    assertLinesMatch(fsFileNames, metadataFilenames);
+    assertEquals(fsStatuses.length, 
partitionToFilesMap.get(partitionPath.toString()).length);
+
+    // Block sizes should be valid
+    Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
+    List<Long> fsBlockSizes = 
Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
+    List<Long> metadataBlockSizes = 
Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
+    assertEquals(fsBlockSizes, metadataBlockSizes);
+
+    assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within 
partition " + partition + " should match");
+    assertEquals(fsFileNames, metadataFilenames, "Files within partition " + 
partition + " should match");
+
+    // FileSystemView should expose the same data
+    List<HoodieFileGroup> fileGroups = 
tableView.getAllFileGroups(partition).collect(Collectors.toList());
+    
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
+
+    fileGroups.forEach(g -> 
LoggerFactory.getLogger(getClass()).info(g.toString()));
+    fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> 
LoggerFactory.getLogger(getClass()).info(b.toString())));
+    fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> 
LoggerFactory.getLogger(getClass()).info(s.toString())));
+
+    long numFiles = fileGroups.stream()
+        .mapToLong(g -> g.getAllBaseFiles().count() + 
g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
+        .sum();
+    assertEquals(metadataFilenames.size(), numFiles);
+  }
+
+  protected HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig 
clientConfig) {
+    return (HoodieBackedTableMetadataWriter) 
JavaHoodieBackedTableMetadataWriter
+        .create(hadoopConf, clientConfig, new 
HoodieJavaEngineContext(hadoopConf), Option.empty());
+  }
+
+  private void runFullValidation(HoodieMetadataConfig metadataConfig,

Review Comment:
   No, good catch. This was ported over from a spark test suite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to