This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 31e674eb57c7e01af783dccb01828a0b6eb2a632
Author: Vinoth Chandar <[email protected]>
AuthorDate: Mon Jan 4 01:15:49 2021 -0800

    [HUDI-1504] Allow log files generated during restore/rollback to be synced 
as well
    
     - TestHoodieBackedMetadata#testSync etc now run for MOR tables
     - HUDI-1502 is still pending and has issues for MOR/rollbacks
     - Also addressed bunch of code review comments.
---
 .../apache/hudi/cli/commands/MetadataCommand.java  |  4 +-
 .../apache/hudi/client/CompactionAdminClient.java  |  5 --
 .../hudi/table/HoodieTimelineArchiveLog.java       |  5 --
 .../apache/hudi/client/SparkRDDWriteClient.java    |  2 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |  7 +-
 .../hudi/metadata/TestHoodieBackedMetadata.java    | 79 ++++++++++------------
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  2 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  4 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |  2 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 11 ++-
 10 files changed, 53 insertions(+), 68 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index f8a8eed..8eac8dd 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.springframework.shell.core.CommandMarker;
@@ -117,7 +117,7 @@ public class MetadataCommand implements CommandMarker {
       // Metadata directory does not exist
     }
 
-    return String.format("Removed Metdata Table from %s", metadataPath);
+    return String.format("Removed Metadata Table from %s", metadataPath);
   }
 
   @CliCommand(value = "metadata init", help = "Update the metadata table from 
commits since the creation")
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 368c6b6..a2ecb67 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -481,11 +481,6 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
     throw new HoodieException("FileGroupId " + fgId + " not in pending 
compaction");
   }
 
-  @Override
-  protected void initWrapperFSMetrics() {
-    // no-op
-  }
-
   /**
    * Holds Operation result for Renaming.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 2a147f7..80724c8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -88,11 +88,6 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
   private final HoodieTable<T, I, K, O> table;
   private final HoodieTableMetaClient metaClient;
 
-  /*
-  public HoodieTimelineArchiveLog(HoodieWriteConfig config, Configuration 
configuration) {
-    this(config, HoodieTable.create(config, configuration));
-  }*/
-
   public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, 
K, O> table) {
     this.config = config;
     this.table = table;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index ec98155..9a22f78 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -44,7 +44,7 @@ import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndex;
 import org.apache.hudi.metrics.DistributedRegistry;
-import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
similarity index 96%
rename from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java
rename to 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 950144b..262ad0e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.metrics;
+package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -35,10 +35,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
-import org.apache.hudi.metadata.HoodieMetadataMetrics;
-import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 313eda2..34c0a35 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -18,24 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -66,10 +48,12 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -77,8 +61,24 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieBackedMetadata.class);
 
@@ -172,13 +172,10 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
   /**
    * Test various table operations sync to Metadata Table correctly.
    */
-  //@ParameterizedTest
-  //@EnumSource(HoodieTableType.class)
-  //public void testTableOperations(HoodieTableType tableType) throws 
Exception {
-  @Test
-  public void testTableOperations() throws Exception {
-    //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
-    init(HoodieTableType.COPY_ON_WRITE);
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testTableOperations(HoodieTableType tableType) throws Exception {
+    init(tableType);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
@@ -281,7 +278,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
 
-      // Rollback of inserts
+      // Write 2 (inserts) + Rollback of inserts
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
       client.startCommitWithTime(newCommitTime);
       records = dataGen.generateInserts(newCommitTime, 20);
@@ -292,7 +289,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       client.syncTableMetadata();
       validateMetadata(client);
 
-      // Rollback of updates
+      // Write 3 (updates) + Rollback of updates
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
       client.startCommitWithTime(newCommitTime);
       records = dataGen.generateUniqueUpdates(newCommitTime, 20);
@@ -341,7 +338,6 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       client.rollback(newCommitTime);
       client.syncTableMetadata();
       validateMetadata(client);
-
     }
 
     // Rollback of partial commits
@@ -411,13 +407,10 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
   /**
    * Test sync of table operations.
    */
-  //@ParameterizedTest
-  //@EnumSource(HoodieTableType.class)
-  //public void testSync(HoodieTableType tableType) throws Exception {
-  @Test
-  public void testSync() throws Exception {
-    //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
-    init(HoodieTableType.COPY_ON_WRITE);
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testSync(HoodieTableType tableType) throws Exception {
+    init(tableType);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
     String newCommitTime;
@@ -453,6 +446,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
     }
 
     // Various table operations without metadata table enabled
+    String restoreToInstant;
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
       // updates
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -479,7 +473,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       }
 
       // Savepoint
-      String savepointInstant = newCommitTime;
+      restoreToInstant = newCommitTime;
       if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
         client.savepoint("hoodie", "metadata test");
         assertFalse(metadata(client).isInSync());
@@ -505,21 +499,20 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
       assertFalse(metadata(client).isInSync());
-
-      client.restoreToInstant(savepointInstant);
-      assertFalse(metadata(client).isInSync());
     }
 
-
     // Enable metadata table and ensure it is synced
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
+      // Restore cannot be done until the metadata table is in sync. See 
HUDI-1502 for details
+      client.restoreToInstant(restoreToInstant);
+      assertFalse(metadata(client).isInSync());
+
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
       client.startCommitWithTime(newCommitTime);
 
       validateMetadata(client);
       assertTrue(metadata(client).isInSync());
     }
-
   }
 
   /**
@@ -673,8 +666,6 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
   /**
    * Test when reading from metadata table which is out of sync with dataset 
that results are still consistent.
    */
-  //  @ParameterizedTest
-  //  @EnumSource(HoodieTableType.class)
   @Test
   public void testMetadataOutOfSync() throws Exception {
     init(HoodieTableType.COPY_ON_WRITE);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d671ec8..2d638b4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -206,7 +206,7 @@ public class FSUtils {
   public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, 
String basePathStr) throws IOException {
     // If the basePathStr is a folder within the .hoodie directory then we are 
listing partitions within an
     // internal table.
-    final boolean isMetadataTable = 
basePathStr.contains(HoodieTableMetaClient.METAFOLDER_NAME);
+    final boolean isMetadataTable = 
HoodieTableMetadata.isMetadataTable(basePathStr);
     final Path basePath = new Path(basePathStr);
     final List<String> partitions = new ArrayList<>();
     processFiles(fs, basePathStr, (locatedFileStatus) -> {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index f62d9d8..33371da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -270,10 +270,10 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     }
 
     HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    List<HoodieInstant> unsyncedInstants = 
findInstantsToSync(datasetMetaClient);
+    List<HoodieInstant> unSyncedInstants = 
findInstantsToSync(datasetMetaClient);
     Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
     timelineRecordScanner =
-        new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, 
unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, 
spillableMapDirectory, null);
+        new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, 
unSyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, 
spillableMapDirectory, null);
   }
 
   protected List<HoodieInstant> findInstantsToSync() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 0886436..0863f7e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -171,7 +171,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * Returns the list of filenames deleted as part of this record.
    */
   public List<String> getDeletions() {
-    return filterFileInfoEntries(true).map(e -> 
e.getKey()).sorted().collect(Collectors.toList());
+    return 
filterFileInfoEntries(true).map(Map.Entry::getKey).sorted().collect(Collectors.toList());
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3017e82..115001a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -238,13 +238,20 @@ public class HoodieTableMetadataUtil {
                                               Option<String> lastSyncTs) {
 
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
+      // Has this rollback produced new files?
+      boolean hasAppendFiles = 
pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
       // If commit being rolled back has not been synced to metadata table yet 
then there is no need to update metadata
-      if (lastSyncTs.isPresent() && 
HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), 
HoodieTimeline.GREATER_THAN, lastSyncTs.get())) {
+      boolean shouldSkip = lastSyncTs.isPresent()
+          && 
HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), 
HoodieTimeline.GREATER_THAN, lastSyncTs.get());
+
+      if (!hasAppendFiles && shouldSkip) {
+        LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, 
given metadata table is already synced upto to %s",
+            rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get()));
         return;
       }
 
       final String partition = pm.getPartitionPath();
-      if (!pm.getSuccessDeleteFiles().isEmpty()) {
+      if (!pm.getSuccessDeleteFiles().isEmpty() && !shouldSkip) {
         if (!partitionToDeletedFiles.containsKey(partition)) {
           partitionToDeletedFiles.put(partition, new ArrayList<>());
         }

Reply via email to