codope commented on code in PR #9379:
URL: https://github.com/apache/hudi/pull/9379#discussion_r1286118785


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,69 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
+    HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
+    I preppedRecordInputs = convertRecordsToWriteClientInput(preppedRecords);
+
+    try (BaseHoodieWriteClient<?, I, ?, O> writeClient = getWriteClient()) {
+      // rollback partially failed writes if any.
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
+
+      if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
+      } else {
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
+        }
+        metadataMetaClient.reloadActiveTimeline();
+      }
+
+      writeClient.startCommitWithTime(instantTime);
+      if (manuallyTransitionCommit) {
+        
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
+      }

Review Comment:
   Why do we need this? Also, rename to `shouldTransitionCommitToInflight`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,69 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
+    HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
+    I preppedRecordInputs = convertRecordsToWriteClientInput(preppedRecords);
+
+    try (BaseHoodieWriteClient<?, I, ?, O> writeClient = getWriteClient()) {
+      // rollback partially failed writes if any.
+      if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
+
+      if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        LOG.info("New commit at " + instantTime + " being applied to MDT.");
+      } else {
+        // this code path refers to a re-attempted commit that:
+        //   1. got committed to metadata table, but failed in datatable.
+        //   2. failed while committing to metadata table
+        // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+        Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
+            .lastInstant();
+        LOG.info(String.format("%s completed commit at %s being applied to 
MDT.",
+            alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+
+        // Rollback the previous commit
+        if (!writeClient.rollback(instantTime)) {
+          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + instantTime + " from MDT");
+        }
+        metadataMetaClient.reloadActiveTimeline();
+      }
+
+      writeClient.startCommitWithTime(instantTime);
+      if (manuallyTransitionCommit) {
+        
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
+      }
+      O statuses;

Review Comment:
   not being used.. let's remove it?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1055,6 +1063,69 @@ public void close() throws Exception {
    */
   protected abstract void commit(String instantTime, 
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap);
 
+  /**
+   * Converts the input records to the input format expected by the write 
client.
+   * @param records records to be converted
+   * @return converted records
+   */
+  protected abstract I 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records);
+
+  protected void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
+                                Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {

Review Comment:
   Is partitioner necessary here? Why can't we bulk insert from the 
engine-specific implemenation?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -119,68 +115,20 @@ protected void initRegistry() {
 
   @Override
   protected void commit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
-    commitInternal(instantTime, partitionRecordsMap, Option.empty());
+    commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
   }
 
+  @Override
+  protected JavaRDD<HoodieRecord> 
convertRecordsToWriteClientInput(HoodieData<HoodieRecord> records) {
+    return HoodieJavaRDD.getJavaRDD(records);
+  }
+
+  @Override
   protected void bulkCommit(
       String instantTime, MetadataPartitionType partitionType, 
HoodieData<HoodieRecord> records,
       int fileGroupCount) {
     SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
-    commitInternal(instantTime, Collections.singletonMap(partitionType, 
records), Option.of(partitioner));
-  }
-
-  private void commitInternal(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap,

Review Comment:
   ok i see you have moved it to super class  so that's why you need the 
partitioner there. I guess yu move it to reuse in Java MDT writer?



##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java:
##########
@@ -225,6 +246,178 @@ protected HoodieJavaWriteClient 
getHoodieWriteClient(HoodieWriteConfig cfg) {
     return writeClient;
   }
 
+  public void syncTableMetadata(HoodieWriteConfig writeConfig) {
+    if (!writeConfig.getMetadataConfig().enabled()) {
+      return;
+    }
+    // Open up the metadata table again, for syncing
+    try (HoodieTableMetadataWriter writer = 
JavaHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context, 
Option.empty())) {
+      LOG.info("Successfully synced to metadata table");
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Error syncing to metadata table.", e);
+    }
+  }
+
+  protected HoodieTableMetadata metadata(HoodieWriteConfig clientConfig, 
HoodieEngineContext engineContext) {
+    return HoodieTableMetadata.create(engineContext, 
clientConfig.getMetadataConfig(), clientConfig.getBasePath());
+  }
+
+  /**
+   * Validate the metadata tables contents to ensure it matches what is on the 
file system.
+   */
+  public void validateMetadata(HoodieTestTable testTable, List<String> 
inflightCommits, HoodieWriteConfig writeConfig,
+                               String metadataTableBasePath, boolean 
doFullValidation) throws IOException {
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    assertNotNull(tableMetadata, "MetadataReader should have been 
initialized");
+    if (!writeConfig.isMetadataTableEnabled()) {
+      return;
+    }
+
+    if (tableMetadata instanceof FileSystemBackedTableMetadata || 
!tableMetadata.getSyncedInstantTime().isPresent()) {
+      throw new IllegalStateException("Metadata should have synced some 
commits or tableMetadata should not be an instance "
+          + "of FileSystemBackedTableMetadata");
+    }
+    assertEquals(inflightCommits, testTable.inflightCommits());
+
+    HoodieTimer timer = HoodieTimer.start();
+    HoodieJavaEngineContext engineContext = new 
HoodieJavaEngineContext(hadoopConf);
+
+    // Partitions should match
+    List<java.nio.file.Path> fsPartitionPaths = 
testTable.getAllPartitionPaths();
+    List<String> fsPartitions = new ArrayList<>();
+    fsPartitionPaths.forEach(entry -> 
fsPartitions.add(entry.getFileName().toString()));
+    if (fsPartitions.isEmpty() && testTable.isNonPartitioned()) {
+      fsPartitions.add("");
+    }
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+
+    Collections.sort(fsPartitions);
+    Collections.sort(metadataPartitions);
+
+    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions 
should match");
+    assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
+
+    // Files within each partition should match
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieJavaTable.create(writeConfig, engineContext);
+    TableFileSystemView tableView = table.getHoodieView();
+    List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> 
basePath + "/" + partition).collect(Collectors.toList());
+    Map<String, FileStatus[]> partitionToFilesMap = 
tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
+    assertEquals(fsPartitions.size(), partitionToFilesMap.size());
+
+    fsPartitions.forEach(partition -> {
+      try {
+        validateFilesPerPartition(testTable, tableMetadata, tableView, 
partitionToFilesMap, partition);
+      } catch (IOException e) {
+        fail("Exception should not be raised: " + e);
+      }
+    });
+    if (doFullValidation) {
+      runFullValidation(table.getConfig().getMetadataConfig(), writeConfig, 
metadataTableBasePath, engineContext);
+    }
+
+    LOG.info("Validation time=" + timer.endTimer());
+  }
+
+  protected void validateFilesPerPartition(HoodieTestTable testTable, 
HoodieTableMetadata tableMetadata, TableFileSystemView tableView,
+                                           Map<String, FileStatus[]> 
partitionToFilesMap, String partition) throws IOException {
+    Path partitionPath;
+    if (partition.equals("")) {
+      // Should be the non-partitioned case
+      partitionPath = new Path(basePath);
+    } else {
+      partitionPath = new Path(basePath, partition);
+    }
+
+    FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition);
+    FileStatus[] metaStatuses = 
tableMetadata.getAllFilesInPartition(partitionPath);
+    List<String> fsFileNames = Arrays.stream(fsStatuses)
+        .map(s -> s.getPath().getName()).collect(Collectors.toList());
+    List<String> metadataFilenames = Arrays.stream(metaStatuses)
+        .map(s -> s.getPath().getName()).collect(Collectors.toList());
+    Collections.sort(fsFileNames);
+    Collections.sort(metadataFilenames);
+
+    assertLinesMatch(fsFileNames, metadataFilenames);
+    assertEquals(fsStatuses.length, 
partitionToFilesMap.get(partitionPath.toString()).length);
+
+    // Block sizes should be valid
+    Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
+    List<Long> fsBlockSizes = 
Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
+    List<Long> metadataBlockSizes = 
Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList());
+    assertEquals(fsBlockSizes, metadataBlockSizes);
+
+    assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within 
partition " + partition + " should match");
+    assertEquals(fsFileNames, metadataFilenames, "Files within partition " + 
partition + " should match");
+
+    // FileSystemView should expose the same data
+    List<HoodieFileGroup> fileGroups = 
tableView.getAllFileGroups(partition).collect(Collectors.toList());
+    
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
+
+    fileGroups.forEach(g -> 
LoggerFactory.getLogger(getClass()).info(g.toString()));
+    fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> 
LoggerFactory.getLogger(getClass()).info(b.toString())));
+    fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> 
LoggerFactory.getLogger(getClass()).info(s.toString())));
+
+    long numFiles = fileGroups.stream()
+        .mapToLong(g -> g.getAllBaseFiles().count() + 
g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
+        .sum();
+    assertEquals(metadataFilenames.size(), numFiles);
+  }
+
+  protected HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig 
clientConfig) {
+    return (HoodieBackedTableMetadataWriter) 
JavaHoodieBackedTableMetadataWriter
+        .create(hadoopConf, clientConfig, new 
HoodieJavaEngineContext(hadoopConf), Option.empty());
+  }
+
+  private void runFullValidation(HoodieMetadataConfig metadataConfig,

Review Comment:
   Do we need metadata config? It's enabled by default. All we need is to 
validate files listed w/ and w/o metadata right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to