This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 31e674eb57c7e01af783dccb01828a0b6eb2a632 Author: Vinoth Chandar <[email protected]> AuthorDate: Mon Jan 4 01:15:49 2021 -0800 [HUDI-1504] Allow log files generated during restore/rollback to be synced as well - TestHoodieBackedMetadata#testSync etc now run for MOR tables - HUDI-1502 is still pending and has issues for MOR/rollbacks - Also addressed bunch of code review comments. --- .../apache/hudi/cli/commands/MetadataCommand.java | 4 +- .../apache/hudi/client/CompactionAdminClient.java | 5 -- .../hudi/table/HoodieTimelineArchiveLog.java | 5 -- .../apache/hudi/client/SparkRDDWriteClient.java | 2 +- .../SparkHoodieBackedTableMetadataWriter.java | 7 +- .../hudi/metadata/TestHoodieBackedMetadata.java | 79 ++++++++++------------ .../java/org/apache/hudi/common/fs/FSUtils.java | 2 +- .../apache/hudi/metadata/BaseTableMetadata.java | 4 +- .../hudi/metadata/HoodieMetadataPayload.java | 2 +- .../hudi/metadata/HoodieTableMetadataUtil.java | 11 ++- 10 files changed, 53 insertions(+), 68 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index f8a8eed..8eac8dd 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -29,7 +29,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; -import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.spark.api.java.JavaSparkContext; import org.springframework.shell.core.CommandMarker; @@ -117,7 +117,7 @@ public class MetadataCommand implements CommandMarker { // Metadata directory does not exist } - return String.format("Removed Metdata Table from %s", metadataPath); + return String.format("Removed Metadata Table from %s", metadataPath); } @CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 368c6b6..a2ecb67 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -481,11 +481,6 @@ public class CompactionAdminClient extends AbstractHoodieClient { throw new HoodieException("FileGroupId " + fgId + " not in pending compaction"); } - @Override - protected void initWrapperFSMetrics() { - // no-op - } - /** * Holds Operation result for Renaming. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 2a147f7..80724c8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -88,11 +88,6 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { private final HoodieTable<T, I, K, O> table; private final HoodieTableMetaClient metaClient; - /* - public HoodieTimelineArchiveLog(HoodieWriteConfig config, Configuration configuration) { - this(config, HoodieTable.create(config, configuration)); - }*/ - public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) { this.config = config; this.table = table; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index ec98155..9a22f78 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -44,7 +44,7 @@ import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndex; import org.apache.hudi.metrics.DistributedRegistry; -import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java similarity index 96% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 950144b..262ad0e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.metrics; +package org.apache.hudi.metadata; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -35,10 +35,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; -import org.apache.hudi.metadata.HoodieMetadataMetrics; -import org.apache.hudi.metadata.HoodieTableMetadataWriter; -import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 313eda2..34c0a35 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -18,24 +18,6 @@ package org.apache.hudi.metadata; -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; - import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -66,10 +48,12 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.metrics.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -77,8 +61,24 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class TestHoodieBackedMetadata extends HoodieClientTestHarness { private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); @@ -172,13 +172,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test various table operations sync to Metadata Table correctly. */ - //@ParameterizedTest - //@EnumSource(HoodieTableType.class) - //public void testTableOperations(HoodieTableType tableType) throws Exception { - @Test - public void testTableOperations() throws Exception { - //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed - init(HoodieTableType.COPY_ON_WRITE); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testTableOperations(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { @@ -281,7 +278,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertNoWriteErrors(writeStatuses); validateMetadata(client); - // Rollback of inserts + // Write 2 (inserts) + Rollback of inserts newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); records = dataGen.generateInserts(newCommitTime, 20); @@ -292,7 +289,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { client.syncTableMetadata(); validateMetadata(client); - // Rollback of updates + // Write 3 (updates) + Rollback of updates newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); records = dataGen.generateUniqueUpdates(newCommitTime, 20); @@ -341,7 +338,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { client.rollback(newCommitTime); client.syncTableMetadata(); validateMetadata(client); - } // Rollback of partial commits @@ -411,13 +407,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test sync of table operations. */ - //@ParameterizedTest - //@EnumSource(HoodieTableType.class) - //public void testSync(HoodieTableType tableType) throws Exception { - @Test - public void testSync() throws Exception { - //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed - init(HoodieTableType.COPY_ON_WRITE); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testSync(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); String newCommitTime; @@ -453,6 +446,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } // Various table operations without metadata table enabled + String restoreToInstant; try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // updates newCommitTime = HoodieActiveTimeline.createNewInstantTime(); @@ -479,7 +473,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } // Savepoint - String savepointInstant = newCommitTime; + restoreToInstant = newCommitTime; if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { client.savepoint("hoodie", "metadata test"); assertFalse(metadata(client).isInSync()); @@ -505,21 +499,20 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); assertFalse(metadata(client).isInSync()); - - client.restoreToInstant(savepointInstant); - assertFalse(metadata(client).isInSync()); } - // Enable metadata table and ensure it is synced try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details + client.restoreToInstant(restoreToInstant); + assertFalse(metadata(client).isInSync()); + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); validateMetadata(client); assertTrue(metadata(client).isInSync()); } - } /** @@ -673,8 +666,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test when reading from metadata table which is out of sync with dataset that results are still consistent. */ - // @ParameterizedTest - // @EnumSource(HoodieTableType.class) @Test public void testMetadataOutOfSync() throws Exception { init(HoodieTableType.COPY_ON_WRITE); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index d671ec8..2d638b4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -206,7 +206,7 @@ public class FSUtils { public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException { // If the basePathStr is a folder within the .hoodie directory then we are listing partitions within an // internal table. - final boolean isMetadataTable = basePathStr.contains(HoodieTableMetaClient.METAFOLDER_NAME); + final boolean isMetadataTable = HoodieTableMetadata.isMetadataTable(basePathStr); final Path basePath = new Path(basePathStr); final List<String> partitions = new ArrayList<>(); processFiles(fs, basePathStr, (locatedFileStatus) -> { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index f62d9d8..33371da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -270,10 +270,10 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); - List<HoodieInstant> unsyncedInstants = findInstantsToSync(datasetMetaClient); + List<HoodieInstant> unSyncedInstants = findInstantsToSync(datasetMetaClient); Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); timelineRecordScanner = - new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null); + new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null); } protected List<HoodieInstant> findInstantsToSync() { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 0886436..0863f7e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -171,7 +171,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata * Returns the list of filenames deleted as part of this record. */ public List<String> getDeletions() { - return filterFileInfoEntries(true).map(e -> e.getKey()).sorted().collect(Collectors.toList()); + return filterFileInfoEntries(true).map(Map.Entry::getKey).sorted().collect(Collectors.toList()); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 3017e82..115001a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -238,13 +238,20 @@ public class HoodieTableMetadataUtil { Option<String> lastSyncTs) { rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { + // Has this rollback produced new files? + boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0; // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata - if (lastSyncTs.isPresent() && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get())) { + boolean shouldSkip = lastSyncTs.isPresent() + && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get()); + + if (!hasAppendFiles && shouldSkip) { + LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s", + rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get())); return; } final String partition = pm.getPartitionPath(); - if (!pm.getSuccessDeleteFiles().isEmpty()) { + if (!pm.getSuccessDeleteFiles().isEmpty() && !shouldSkip) { if (!partitionToDeletedFiles.containsKey(partition)) { partitionToDeletedFiles.put(partition, new ArrayList<>()); }
