This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rfc-15 by this push:
new 11661dc [HUDI-1325] [RFC-15] Merge updates of unsynced instants to
metadata table (#2342)
11661dc is described below
commit 11661dc6aa03a8b8d39f823f9991ed8402a35ef1
Author: rmpifer <[email protected]>
AuthorDate: Mon Dec 28 09:52:02 2020 -0800
[HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table
(#2342)
Co-authored-by: Ryan Pifer <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 227 +--------------
.../hudi/client/TestCompactionAdminClient.java | 6 +
...Metadata.java => TestHoodieBackedMetadata.java} | 158 +++++++++--
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 6 +
.../hudi/testutils/HoodieClientTestHarness.java | 7 +-
.../hudi/metadata/AbstractHoodieTableMetadata.java | 303 ++++++++++++++++++++
.../hudi/metadata/HoodieBackedTableMetadata.java | 205 +------------
.../HoodieMetadataMergedInstantRecordScanner.java | 115 ++++++++
.../hudi/metadata/HoodieTableMetadataUtil.java | 316 +++++++++++++++++++++
9 files changed, 906 insertions(+), 437 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f89a198..c3ba2a9 100644
---
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -42,13 +42,10 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
-import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -56,7 +53,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metrics.DistributedRegistry;
@@ -72,18 +68,14 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
-import static
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
/**
@@ -232,7 +224,7 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
return metadataWriteConfig;
}
- public HoodieTableMetadata metadata() {
+ public HoodieBackedTableMetadata metadata() {
return metadata;
}
@@ -413,38 +405,12 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
// Read each instant in order and sync it to metadata table
- final HoodieActiveTimeline timeline =
datasetMetaClient.getActiveTimeline();
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");
- ValidationUtils.checkArgument(instant.isCompleted(), "Only completed
instants can be synced.");
-
- switch (instant.getAction()) {
- case HoodieTimeline.CLEAN_ACTION:
- HoodieCleanMetadata cleanMetadata =
CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
- update(cleanMetadata, instant.getTimestamp());
- break;
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.COMPACTION_ACTION:
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
- timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- update(commitMetadata, instant.getTimestamp());
- break;
- case HoodieTimeline.ROLLBACK_ACTION:
- HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
- timeline.getInstantDetails(instant).get());
- update(rollbackMetadata, instant.getTimestamp());
- break;
- case HoodieTimeline.RESTORE_ACTION:
- HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
- timeline.getInstantDetails(instant).get());
- update(restoreMetadata, instant.getTimestamp());
- break;
- case HoodieTimeline.SAVEPOINT_ACTION:
- // Nothing to be done here
- break;
- default:
- throw new HoodieException("Unknown type of action " +
instant.getAction());
+
+ Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
metadata.getSyncedInstantTime());
+ if (records.isPresent()) {
+ commit(jsc, prepRecords(jsc, records.get(),
MetadataPartitionType.FILES.partitionPath()), instant.getTimestamp());
}
}
// re-init the table metadata, for any future writes.
@@ -466,39 +432,7 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
return;
}
- List<HoodieRecord> records = new LinkedList<>();
- List<String> allPartitions = new LinkedList<>();
- commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
- final String partition = partitionStatName.equals("") ?
NON_PARTITIONED_NAME : partitionStatName;
- allPartitions.add(partition);
-
- Map<String, Long> newFiles = new HashMap<>(writeStats.size());
- writeStats.forEach(hoodieWriteStat -> {
- String pathWithPartition = hoodieWriteStat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- LOG.warn("Unable to find path in write stat to update metadata table
" + hoodieWriteStat);
- return;
- }
-
- int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 :
partition.length() + 1;
- String filename = pathWithPartition.substring(offset);
- ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate
files in HoodieCommitMetadata");
- newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
- });
-
- // New files added to a partition
- HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
- partition, Option.of(newFiles), Option.empty());
- records.add(record);
- });
-
- // New partitions created
- HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new
ArrayList<>(allPartitions));
- records.add(record);
-
- LOG.info("Updating at " + instantTime + " from Commit/" +
commitMetadata.getOperationType()
- + ". #partitions_updated=" + records.size());
+ List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime);
commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
@@ -514,21 +448,7 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
return;
}
- List<HoodieRecord> records = new LinkedList<>();
- int[] fileDeleteCount = {0};
- cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition,
deletedPathInfo) -> {
- fileDeleteCount[0] += deletedPathInfo.size();
-
- // Files deleted from a partition
- List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new
Path(p.getFilePath()).getName())
- .collect(Collectors.toList());
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
- Option.of(deletedFilenames));
- records.add(record);
- });
-
- LOG.info("Updating at " + instantTime + " from CleanerPlan.
#partitions_updated=" + records.size()
- + ", #files_deleted=" + fileDeleteCount[0]);
+ List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime);
commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
@@ -544,21 +464,7 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
return;
}
- List<HoodieRecord> records = new LinkedList<>();
- int[] fileDeleteCount = {0};
-
- cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
- // Files deleted from a partition
- List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
- Option.of(new ArrayList<>(deletedFiles)));
-
- records.add(record);
- fileDeleteCount[0] += deletedFiles.size();
- });
-
- LOG.info("Updating at " + instantTime + " from Clean.
#partitions_updated=" + records.size()
- + ", #files_deleted=" + fileDeleteCount[0]);
+ List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime);
commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
@@ -574,12 +480,8 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
return;
}
- Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
- Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
- restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
- rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles,
partitionToAppendedFiles));
- });
- commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles,
instantTime, "Restore");
+ List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime,
metadata.getSyncedInstantTime());
+ commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
/**
@@ -594,114 +496,7 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
return;
}
- Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
- Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
- processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles,
partitionToAppendedFiles);
- commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles,
instantTime, "Rollback");
- }
-
- /**
- * Extracts information about the deleted and append files from the {@code
HoodieRollbackMetadata}.
- *
- * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This
- * function will extract this change file for each partition.
- *
- * @param rollbackMetadata {@code HoodieRollbackMetadata}
- * @param partitionToDeletedFiles The {@code Map} to fill with files deleted
per partition.
- * @param partitionToAppendedFiles The {@code Map} to fill with files
appended per partition and their sizes.
- */
- private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
- Map<String, List<String>>
partitionToDeletedFiles,
- Map<String, Map<String, Long>>
partitionToAppendedFiles) {
- rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
- final String partition = pm.getPartitionPath();
-
- if (!pm.getSuccessDeleteFiles().isEmpty()) {
- if (!partitionToDeletedFiles.containsKey(partition)) {
- partitionToDeletedFiles.put(partition, new ArrayList<>());
- }
-
- // Extract deleted file name from the absolute paths saved in
getSuccessDeleteFiles()
- List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p
-> new Path(p).getName())
- .collect(Collectors.toList());
- partitionToDeletedFiles.get(partition).addAll(deletedFiles);
- }
-
- if (!pm.getAppendFiles().isEmpty()) {
- if (!partitionToAppendedFiles.containsKey(partition)) {
- partitionToAppendedFiles.put(partition, new HashMap<>());
- }
-
- // Extract appended file name from the absolute paths saved in
getAppendFiles()
- pm.getAppendFiles().forEach((path, size) -> {
- partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
- return size + oldSize;
- });
- });
- }
- });
- }
-
- /**
- * Create file delete records and commit.
- *
- * @param partitionToDeletedFiles {@code Map} of partitions and the deleted
files
- * @param instantTime Timestamp at which the deletes took place
- * @param operation Type of the operation which caused the files to be
deleted
- */
- private void commitRollback(JavaSparkContext jsc, Map<String, List<String>>
partitionToDeletedFiles,
- Map<String, Map<String, Long>>
partitionToAppendedFiles, String instantTime,
- String operation) {
- List<HoodieRecord> records = new LinkedList<>();
- int[] fileChangeCount = {0, 0}; // deletes, appends
-
- partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
- // Rollbacks deletes instants from timeline. The instant being
rolled-back may not have been synced to the
- // metadata table. Hence, the deleted filed need to be checked against
the metadata.
- try {
- FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new
Path(metadata.getDatasetBasePath(), partition));
- Set<String> currentFiles =
- Arrays.stream(existingStatuses).map(s ->
s.getPath().getName()).collect(Collectors.toSet());
-
- int origCount = deletedFiles.size();
- deletedFiles.removeIf(f -> !currentFiles.contains(f));
- if (deletedFiles.size() != origCount) {
- LOG.warn("Some Files to be deleted as part of " + operation + " at "
+ instantTime + " were not found in the "
- + " metadata for partition " + partition
- + ". To delete = " + origCount + ", found=" +
deletedFiles.size());
- }
-
- fileChangeCount[0] += deletedFiles.size();
-
- Option<Map<String, Long>> filesAdded = Option.empty();
- if (partitionToAppendedFiles.containsKey(partition)) {
- filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
- }
-
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
- Option.of(new ArrayList<>(deletedFiles)));
- records.add(record);
- } catch (IOException e) {
- throw new HoodieMetadataException("Failed to commit rollback deletes
at instant " + instantTime, e);
- }
- });
-
- partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
- fileChangeCount[1] += appendedFileMap.size();
-
- // Validate that no appended file has been deleted
- ValidationUtils.checkState(
-
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition,
Collections.emptyList())),
- "Rollback file cannot both be appended and deleted");
-
- // New files added to a partition
- HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Option.of(appendedFileMap),
- Option.empty());
- records.add(record);
- });
-
- LOG.info("Updating at " + instantTime + " from " + operation + ".
#partitions_updated=" + records.size()
- + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" +
fileChangeCount[1]);
+ List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime,
metadata.getSyncedInstantTime());
commit(jsc, prepRecords(jsc, records,
MetadataPartitionType.FILES.partitionPath()), instantTime);
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 1200f67..c42110e 100644
---
a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++
b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -37,6 +37,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -70,6 +71,11 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
client = new CompactionAdminClient(jsc, basePath);
}
+ @AfterEach
+ public void cleanUp() throws Exception {
+ cleanupResources();
+ }
+
@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
diff --git
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
similarity index 86%
rename from
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
rename to
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 2823035..48d07e5 100644
---
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
+++
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -73,10 +73,11 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
-public class TestHoodieFsMetadata extends HoodieClientTestHarness {
- private static final Logger LOG =
LogManager.getLogger(TestHoodieFsMetadata.class);
+public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
+ private static final Logger LOG =
LogManager.getLogger(TestHoodieBackedMetadata.class);
@TempDir
public java.nio.file.Path tempFolder;
@@ -91,7 +92,7 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
initSparkContexts("TestHoodieMetadata");
initFileSystem();
fs.mkdirs(new Path(basePath));
- initMetaClient();
+ initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
}
@@ -168,6 +169,7 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
//public void testTableOperations(HoodieTableType tableType) throws
Exception {
+ @Test
public void testTableOperations() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
@@ -257,6 +259,7 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
//public void testRollbackOperations(HoodieTableType tableType) throws
Exception {
+ @Test
public void testRollbackOperations() throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
@@ -364,6 +367,40 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
}
/**
+ * Test when syncing rollback to metadata if the commit being rolled back
has not been synced that essentially a no-op
+ * occurs to metadata.
+ * @throws Exception
+ */
+ @Test
+ public void testRollbackUnsyncedCommit() throws Exception {
+ init(HoodieTableType.COPY_ON_WRITE);
+
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, true))) {
+ // Initialize table with metadata
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ client.startCommitWithTime(newCommitTime);
+ List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ validateMetadata(client);
+ }
+
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, false))) {
+ // Commit with metadata disabled
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+ List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.rollback(newCommitTime);
+ }
+
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, true))) {
+ validateMetadata(client);
+ }
+ }
+
+ /**
* Test sync of table operations.
*/
//@ParameterizedTest
@@ -472,7 +509,6 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
validateMetadata(client);
assertTrue(metadata(client).isInSync());
}
-
}
/**
@@ -620,31 +656,100 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
}
/**
+ * Test when reading from metadata table which is out of sync with dataset
that results are still consistent.
+ */
+ // @ParameterizedTest
+ // @EnumSource(HoodieTableType.class)
+ @Test
+ public void testMetadataOutOfSync() throws Exception {
+ init(HoodieTableType.COPY_ON_WRITE);
+
+ HoodieWriteClient unsyncedClient = new HoodieWriteClient<>(jsc,
getWriteConfig(true, true));
+
+ // Enable metadata so table is initialized
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, true))) {
+ // Perform Bulk Insert
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ }
+
+ // Perform commit operations with metadata disabled
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, false))) {
+ // Perform Insert
+ String newCommitTime = "002";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+ // Perform Upsert
+ newCommitTime = "003";
+ client.startCommitWithTime(newCommitTime);
+ records = dataGen.generateUniqueUpdates(newCommitTime, 20);
+ client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+ // Compaction
+ if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+ newCommitTime = "004";
+ client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+ client.compact(newCommitTime);
+ }
+ }
+
+ assertFalse(metadata(unsyncedClient).isInSync());
+ validateMetadata(unsyncedClient);
+
+ // Perform clean operation with metadata disabled
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, false))) {
+ // One more commit needed to trigger clean so upsert and compact
+ String newCommitTime = "005";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
+ client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+ if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+ newCommitTime = "006";
+ client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+ client.compact(newCommitTime);
+ }
+
+ // Clean
+ newCommitTime = "007";
+ client.clean(newCommitTime);
+ }
+
+ assertFalse(metadata(unsyncedClient).isInSync());
+ validateMetadata(unsyncedClient);
+
+ // Perform restore with metadata disabled
+ try (HoodieWriteClient client = new HoodieWriteClient<>(jsc,
getWriteConfig(true, false))) {
+ client.restoreToInstant("004");
+ }
+
+ assertFalse(metadata(unsyncedClient).isInSync());
+ validateMetadata(unsyncedClient);
+ }
+
+ /**
* Validate the metadata tables contents to ensure it matches what is on the
file system.
*
* @throws IOException
*/
private void validateMetadata(HoodieWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
- HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
- assertNotNull(metadataWriter, "MetadataWriter should have been
initialized");
+
+ HoodieBackedTableMetadata tableMetadata = metadata(client);
+ assertNotNull(tableMetadata, "MetadataReader should have been
initialized");
if (!config.useFileListingMetadata()) {
return;
}
HoodieTimer timer = new HoodieTimer().startTimer();
- // Validate write config for metadata table
- HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
- assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata
table for metadata table");
- assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify
for metadata table");
-
- // Metadata table should be in sync with the dataset
- assertTrue(metadata(client).isInSync());
-
// Partitions should match
List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs,
basePath);
- List<String> metadataPartitions =
metadataWriter.metadata().getAllPartitionPaths();
+ List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
Collections.sort(fsPartitions);
Collections.sort(metadataPartitions);
@@ -665,8 +770,9 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
} else {
partitionPath = new Path(basePath, partition);
}
+
FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs,
partitionPath);
- FileStatus[] metaStatuses =
metadataWriter.metadata().getAllFilesInPartition(partitionPath);
+ FileStatus[] metaStatuses =
tableMetadata.getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
List<String> metadataFilenames = Arrays.stream(metaStatuses)
@@ -687,9 +793,9 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups =
tableView.getAllFileGroups(partition).collect(Collectors.toList());
- fileGroups.forEach(g ->
LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
- fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b ->
LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
- fileGroups.forEach(g -> g.getAllFileSlices().forEach(s ->
LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
+ fileGroups.forEach(g ->
LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
+ fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b ->
LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
+ fileGroups.forEach(g -> g.getAllFileSlices().forEach(s ->
LogManager.getLogger(TestHoodieBackedMetadata.class).info(s)));
long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count() +
g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
@@ -702,10 +808,18 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
}
});
- HoodieTableMetaClient metadataMetaClient = new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+ HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+ assertNotNull(metadataWriter, "MetadataWriter should have been
initialized");
+
+ // Validate write config for metadata table
+ HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
+ assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata
table for metadata table");
+ assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify
for metadata table");
// Metadata table should be in sync with the dataset
- assertTrue(metadataWriter.metadata().isInSync());
+ assertTrue(metadata(client).isInSync());
+
+ HoodieTableMetaClient metadataMetaClient = new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(),
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
@@ -742,7 +856,7 @@ public class TestHoodieFsMetadata extends
HoodieClientTestHarness {
private HoodieBackedTableMetadata metadata(HoodieWriteClient client) {
HoodieWriteConfig clientConfig = client.getConfig();
- return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf,
clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
+ return (HoodieBackedTableMetadata)
AbstractHoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(),
clientConfig.getSpillableMapBasePath(),
clientConfig.useFileListingMetadata(),
clientConfig.getFileListingMetadataVerify(), false,
clientConfig.shouldAssumeDatePartitioning());
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index e1dc4ce..da5f434 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -91,6 +92,11 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
initDFSMetaClient();
}
+ @AfterEach
+ public void cleanUp() throws Exception {
+ cleanupResources();
+ }
+
@Test
public void testLeftOverUpdatedPropFileCleanup() throws IOException {
testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index f1e3f17..cf49174 100644
---
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -21,6 +21,7 @@ import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -189,6 +190,10 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
* @throws IOException
*/
protected void initMetaClient() throws IOException {
+ initMetaClient(getTableType());
+ }
+
+ protected void initMetaClient(HoodieTableType tableType) throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been
initialized.");
}
@@ -197,7 +202,7 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
throw new IllegalStateException("The Spark context has not been
initialized.");
}
- metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType());
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
new file mode 100644
index 0000000..15b9ebf
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Interface that supports querying various pieces of metadata about a hudi
table.
+ */
+public abstract class AbstractHoodieTableMetadata implements
HoodieTableMetadata {
+
+ private static final Logger LOG =
LogManager.getLogger(AbstractHoodieTableMetadata.class);
+
+ static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+ static final int BUFFER_SIZE = 10 * 1024 * 1024;
+
+ protected final SerializableConfiguration hadoopConf;
+ protected final Option<HoodieMetadataMetrics> metrics;
+ protected final String datasetBasePath;
+
+ // Directory used for Spillable Map when merging records
+ final String spillableMapDirectory;
+
+ protected boolean enabled;
+ private final boolean validateLookups;
+ private final boolean assumeDatePartitioning;
+
+ private transient HoodieMetadataMergedInstantRecordScanner
timelineRecordScanner;
+
+ protected AbstractHoodieTableMetadata(Configuration hadoopConf, String
datasetBasePath, String spillableMapDirectory,
+ boolean enabled, boolean
validateLookups, boolean enableMetrics,
+ boolean assumeDatePartitioning) {
+ this.hadoopConf = new SerializableConfiguration(hadoopConf);
+ this.datasetBasePath = datasetBasePath;
+ this.spillableMapDirectory = spillableMapDirectory;
+
+ this.enabled = enabled;
+ this.validateLookups = validateLookups;
+ this.assumeDatePartitioning = assumeDatePartitioning;
+
+ if (enableMetrics) {
+ this.metrics = Option.of(new
HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
+ } else {
+ this.metrics = Option.empty();
+ }
+ }
+
+ public static AbstractHoodieTableMetadata create(Configuration conf, String
datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata,
+ boolean verifyListings,
boolean enableMetrics, boolean shouldAssumeDatePartitioning) {
+ return new HoodieBackedTableMetadata(conf, datasetBasePath,
spillableMapPath, useFileListingFromMetadata, verifyListings,
+ enableMetrics, shouldAssumeDatePartitioning);
+ }
+
+
+ /**
+ * Return the list of files in a partition.
+ *
+ * If the Metadata Table is enabled, the listing is retrieved from the
stored metadata. Otherwise, the list of
+ * partitions is retrieved directly from the underlying {@code FileSystem}.
+ *
+ * On any errors retrieving the listing from the metadata, defaults to using
the file system listings.
+ *
+ * @param partitionPath The absolute path of the partition to list
+ */
+
+ public FileStatus[] getAllFilesInPartition(Path partitionPath) throws
IOException {
+ if (enabled) {
+ try {
+ return fetchAllFilesInPartition(partitionPath);
+ } catch (Exception e) {
+ LOG.error("Failed to retrive files in partition " + partitionPath + "
from metadata", e);
+ }
+ }
+ return FSUtils.getFs(partitionPath.toString(),
hadoopConf.get()).listStatus(partitionPath);
+ }
+
+ /**
+ * Return the list of partitions in the dataset.
+ *
+ * If the Metadata Table is enabled, the listing is retrieved from the
stored metadata. Otherwise, the list of
+ * partitions is retrieved directly from the underlying {@code FileSystem}.
+ *
+ * On any errors retrieving the listing from the metadata, defaults to using
the file system listings.
+ *
+ */
+ public List<String> getAllPartitionPaths() throws IOException {
+ if (enabled) {
+ try {
+ return fetchAllPartitionPaths();
+ } catch (Exception e) {
+ LOG.error("Failed to retrieve list of partition from metadata", e);
+ }
+ }
+
+ return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath,
assumeDatePartitioning).getAllPartitionPaths();
+ }
+
+ /**
+ * Returns a list of all partitions.
+ */
+ /**
+ * Returns a list of all partitions.
+ */
+ protected List<String> fetchAllPartitionPaths() throws IOException {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
+
+ List<String> partitions = Collections.emptyList();
+ if (hoodieRecord.isPresent()) {
+ if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+ throw new HoodieMetadataException("Metadata partition list record is
inconsistent: "
+ + hoodieRecord.get().getData());
+ }
+
+ partitions = hoodieRecord.get().getData().getFilenames();
+ // Partition-less tables have a single empty partition
+ if (partitions.contains(NON_PARTITIONED_NAME)) {
+ partitions.remove(NON_PARTITIONED_NAME);
+ partitions.add("");
+ }
+ }
+
+ if (validateLookups) {
+ // Validate the Metadata Table data by listing the partitions from the
file system
+ timer.startTimer();
+ FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new
FileSystemBackedTableMetadata(hadoopConf, datasetBasePath,
assumeDatePartitioning);
+ List<String> actualPartitions =
fileSystemBackedTableMetadata.getAllPartitionPaths();
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR,
timer.endTimer()));
+
+ Collections.sort(actualPartitions);
+ Collections.sort(partitions);
+ if (!actualPartitions.equals(partitions)) {
+ LOG.error("Validation of metadata partition list failed. Lists do not
match.");
+ LOG.error("Partitions from metadata: " +
Arrays.toString(partitions.toArray()));
+ LOG.error("Partitions from file system: " +
Arrays.toString(actualPartitions.toArray()));
+
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
+ }
+
+ // Return the direct listing as it should be correct
+ partitions = actualPartitions;
+ }
+
+ LOG.info("Listed partitions from metadata: #partitions=" +
partitions.size());
+ return partitions;
+ }
+
+ /**
+ * Return all the files from the partition.
+ *
+ * @param partitionPath The absolute path of the partition
+ */
+ private FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws
IOException {
+ String partitionName = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), partitionPath);
+ if (partitionName.isEmpty()) {
+ partitionName = NON_PARTITIONED_NAME;
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getMergedRecordByKey(partitionName);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
+
+ FileStatus[] statuses = {};
+ if (hoodieRecord.isPresent()) {
+ if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+ throw new HoodieMetadataException("Metadata record for partition " +
partitionName + " is inconsistent: "
+ + hoodieRecord.get().getData());
+ }
+ statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
+ }
+
+ if (validateLookups) {
+ // Validate the Metadata Table data by listing the partitions from the
file system
+ timer.startTimer();
+
+ // Ignore partition metadata file
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+ FileStatus[] directStatuses =
metaClient.getFs().listStatus(partitionPath,
+ p ->
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
+
+ List<String> directFilenames = Arrays.stream(directStatuses)
+ .map(s -> s.getPath().getName()).sorted()
+ .collect(Collectors.toList());
+
+ List<String> metadataFilenames = Arrays.stream(statuses)
+ .map(s -> s.getPath().getName()).sorted()
+ .collect(Collectors.toList());
+
+ if (!metadataFilenames.equals(directFilenames)) {
+ LOG.error("Validation of metadata file listing for partition " +
partitionName + " failed.");
+ LOG.error("File list from metadata: " +
Arrays.toString(metadataFilenames.toArray()));
+ LOG.error("File list from direct listing: " +
Arrays.toString(directFilenames.toArray()));
+
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
+ }
+
+ // Return the direct listing as it should be correct
+ statuses = directStatuses;
+ }
+
+ LOG.info("Listed file in partition from metadata: partition=" +
partitionName + ", #files=" + statuses.length);
+ return statuses;
+ }
+
+ /**
+ * Retrieve the merged {@code HoodieRecord} mapped to the given key.
+ *
+ * @param key The key of the record
+ */
+ private Option<HoodieRecord<HoodieMetadataPayload>>
getMergedRecordByKey(String key) throws IOException {
+ Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord;
+ openTimelineScanner();
+
+ Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord =
getRecordByKeyFromMetadata(key);
+ // Retrieve record from unsynced timeline instants
+ Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord =
timelineRecordScanner.getRecordByKey(key);
+ if (timelineHoodieRecord.isPresent()) {
+ if (metadataHoodieRecord.isPresent()) {
+ HoodieRecordPayload mergedPayload =
timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
+ mergedRecord = Option.of(new
HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
+ } else {
+ mergedRecord = timelineHoodieRecord;
+ }
+ } else {
+ mergedRecord = metadataHoodieRecord;
+ }
+ return mergedRecord;
+ }
+
+ protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) throws IOException;
+
+ private void openTimelineScanner() throws IOException {
+ if (timelineRecordScanner != null) {
+ // Already opened
+ return;
+ }
+
+ HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+ List<HoodieInstant> unsyncedInstants =
findInstantsToSync(datasetMetaClient);
+ Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+ timelineRecordScanner =
+ new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient,
unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES,
spillableMapDirectory, null);
+ }
+
+ protected List<HoodieInstant> findInstantsToSync() {
+ HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+ return findInstantsToSync(datasetMetaClient);
+ }
+
+ protected abstract List<HoodieInstant>
findInstantsToSync(HoodieTableMetaClient datasetMetaClient);
+
+ public boolean isInSync() {
+ return enabled && findInstantsToSync().isEmpty();
+ }
+
+ protected void closeReaders() {
+ timelineRecordScanner = null;
+ }
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 3a1c7bf..bf5aa76 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -19,8 +19,6 @@
package org.apache.hudi.metadata;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,17 +27,13 @@ import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -52,7 +46,6 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -65,24 +58,13 @@ import org.apache.log4j.Logger;
* If the metadata table does not exist, RPC calls are used to retrieve file
listings from the file system.
* No updates are applied to the table and it is not synced.
*/
-public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+public class HoodieBackedTableMetadata extends AbstractHoodieTableMetadata {
private static final Logger LOG =
LogManager.getLogger(HoodieBackedTableMetadata.class);
- private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
- private static final int BUFFER_SIZE = 10 * 1024 * 1024;
- private final SerializableConfiguration hadoopConf;
- private final String datasetBasePath;
private final String metadataBasePath;
- private final Option<HoodieMetadataMetrics> metrics;
private HoodieTableMetaClient metaClient;
- private boolean enabled;
- private final boolean validateLookups;
- private final boolean assumeDatePartitioning;
- // Directory used for Spillable Map when merging records
- private final String spillableMapDirectory;
-
// Readers for the base and log file which store the metadata
private transient HoodieFileReader<GenericRecord> baseFileReader;
private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
@@ -95,13 +77,8 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath,
String spillableMapDirectory,
boolean enabled, boolean validateLookups,
boolean enableMetrics,
boolean assumeDatePartitioning) {
- this.hadoopConf = new SerializableConfiguration(conf);
- this.datasetBasePath = datasetBasePath;
+ super(conf, datasetBasePath, spillableMapDirectory, enabled,
validateLookups, enableMetrics, assumeDatePartitioning);
this.metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
- this.validateLookups = validateLookups;
- this.spillableMapDirectory = spillableMapDirectory;
- this.enabled = enabled;
- this.assumeDatePartitioning = assumeDatePartitioning;
if (enabled) {
try {
@@ -116,164 +93,6 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
} else {
LOG.info("Metadata table is disabled.");
}
-
- if (enableMetrics) {
- this.metrics = Option.of(new
HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
- } else {
- this.metrics = Option.empty();
- }
- }
-
- /**
- * Return the list of partitions in the dataset.
- *
- * If the Metadata Table is enabled, the listing is retrieved from the
stored metadata. Otherwise, the list of
- * partitions is retrieved directly from the underlying {@code FileSystem}.
- *
- * On any errors retrieving the listing from the metadata, defaults to using
the file system listings.
- *
- */
- @Override
- public List<String> getAllPartitionPaths()
- throws IOException {
- if (enabled) {
- try {
- return fetchAllPartitionPaths();
- } catch (Exception e) {
- LOG.error("Failed to retrieve list of partition from metadata", e);
- }
- }
-
- return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath,
assumeDatePartitioning).getAllPartitionPaths();
- }
-
- /**
- * Return the list of files in a partition.
- *
- * If the Metadata Table is enabled, the listing is retrieved from the
stored metadata. Otherwise, the list of
- * partitions is retrieved directly from the underlying {@code FileSystem}.
- *
- * On any errors retrieving the listing from the metadata, defaults to using
the file system listings.
- *
- * @param partitionPath The absolute path of the partition to list
- */
- @Override
- public FileStatus[] getAllFilesInPartition(Path partitionPath)
- throws IOException {
- if (enabled) {
- try {
- return fetchAllFilesInPartition(partitionPath);
- } catch (Exception e) {
- LOG.error("Failed to retrive files in partition " + partitionPath + "
from metadata", e);
- }
- }
-
- return FSUtils.getFs(partitionPath.toString(),
hadoopConf.get()).listStatus(partitionPath);
- }
-
- /**
- * Returns a list of all partitions.
- */
- protected List<String> fetchAllPartitionPaths() throws IOException {
- HoodieTimer timer = new HoodieTimer().startTimer();
- Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
-
- List<String> partitions = Collections.emptyList();
- if (hoodieRecord.isPresent()) {
- if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
- throw new HoodieMetadataException("Metadata partition list record is
inconsistent: "
- + hoodieRecord.get().getData());
- }
-
- partitions = hoodieRecord.get().getData().getFilenames();
- // Partition-less tables have a single empty partition
- if (partitions.contains(NON_PARTITIONED_NAME)) {
- partitions.remove(NON_PARTITIONED_NAME);
- partitions.add("");
- }
- }
-
- if (validateLookups) {
- // Validate the Metadata Table data by listing the partitions from the
file system
- timer.startTimer();
- FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new
FileSystemBackedTableMetadata(hadoopConf, datasetBasePath,
assumeDatePartitioning);
- List<String> actualPartitions =
fileSystemBackedTableMetadata.getAllPartitionPaths();
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR,
timer.endTimer()));
-
- Collections.sort(actualPartitions);
- Collections.sort(partitions);
- if (!actualPartitions.equals(partitions)) {
- LOG.error("Validation of metadata partition list failed. Lists do not
match.");
- LOG.error("Partitions from metadata: " +
Arrays.toString(partitions.toArray()));
- LOG.error("Partitions from file system: " +
Arrays.toString(actualPartitions.toArray()));
-
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
- }
-
- // Return the direct listing as it should be correct
- partitions = actualPartitions;
- }
-
- LOG.info("Listed partitions from metadata: #partitions=" +
partitions.size());
- return partitions;
- }
-
- /**
- * Return all the files from the partition.
- *
- * @param partitionPath The absolute path of the partition
- */
- FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException
{
- String partitionName = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), partitionPath);
- if (partitionName.isEmpty()) {
- partitionName = NON_PARTITIONED_NAME;
- }
-
- HoodieTimer timer = new HoodieTimer().startTimer();
- Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getMergedRecordByKey(partitionName);
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
-
- FileStatus[] statuses = {};
- if (hoodieRecord.isPresent()) {
- if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
- throw new HoodieMetadataException("Metadata record for partition " +
partitionName + " is inconsistent: "
- + hoodieRecord.get().getData());
- }
- statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
- }
-
- if (validateLookups) {
- // Validate the Metadata Table data by listing the partitions from the
file system
- timer.startTimer();
-
- // Ignore partition metadata file
- FileStatus[] directStatuses =
metaClient.getFs().listStatus(partitionPath,
- p ->
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
-
- List<String> directFilenames = Arrays.stream(directStatuses)
- .map(s -> s.getPath().getName()).sorted()
- .collect(Collectors.toList());
-
- List<String> metadataFilenames = Arrays.stream(statuses)
- .map(s -> s.getPath().getName()).sorted()
- .collect(Collectors.toList());
-
- if (!metadataFilenames.equals(directFilenames)) {
- LOG.error("Validation of metadata file listing for partition " +
partitionName + " failed.");
- LOG.error("File list from metadata: " +
Arrays.toString(metadataFilenames.toArray()));
- LOG.error("File list from direct listing: " +
Arrays.toString(directFilenames.toArray()));
-
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
- }
-
- // Return the direct listing as it should be correct
- statuses = directStatuses;
- }
-
- LOG.info("Listed file in partition from metadata: partition=" +
partitionName + ", #files=" + statuses.length);
- return statuses;
}
/**
@@ -281,7 +100,8 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
*
* @param key The key of the record
*/
- private Option<HoodieRecord<HoodieMetadataPayload>>
getMergedRecordByKey(String key) throws IOException {
+ @Override
+ protected Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key) throws IOException {
openBaseAndLogFiles();
// Retrieve record from base file
@@ -314,7 +134,7 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
/**
* Open readers to the base and log files.
*/
- private synchronized void openBaseAndLogFiles() throws IOException {
+ protected synchronized void openBaseAndLogFiles() throws IOException {
if (logRecordScanner != null) {
// Already opened
return;
@@ -363,7 +183,9 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
}
+ @Override
protected void closeReaders() {
+ super.closeReaders();
if (baseFileReader != null) {
baseFileReader.close();
baseFileReader = null;
@@ -372,19 +194,6 @@ public class HoodieBackedTableMetadata implements
HoodieTableMetadata {
}
/**
- * Return {@code True} if all Instants from the dataset have been synced
with the Metadata Table.
- */
- @Override
- public boolean isInSync() {
- return enabled && findInstantsToSync().isEmpty();
- }
-
- private List<HoodieInstant> findInstantsToSync() {
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
- return findInstantsToSync(datasetMetaClient);
- }
-
- /**
* Return an ordered list of instants which have not been synced to the
Metadata Table.
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
new file mode 100644
index 0000000..c98f48c
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides functionality to convert timeline instants to table metadata
records and then merge by key. Specify
+ * a filter to limit keys that are merged and stored in memory.
+ */
+public class HoodieMetadataMergedInstantRecordScanner {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class);
+
+ HoodieTableMetaClient metaClient;
+ private List<HoodieInstant> instants;
+ private Option<String> lastSyncTs;
+ private Set<String> mergeKeyFilter;
+ protected final ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records;
+
+ public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient
metaClient, List<HoodieInstant> instants,
+ Option<String> lastSyncTs,
Schema readerSchema, Long maxMemorySizeInBytes,
+ String spillableMapBasePath,
Set<String> mergeKeyFilter) throws IOException {
+ this.metaClient = metaClient;
+ this.instants = instants;
+ this.lastSyncTs = lastSyncTs;
+ this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter :
Collections.emptySet();
+ this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(readerSchema));
+
+ scan();
+ }
+
+ /**
+ * Converts instants in scanner to metadata table records and processes each
record.
+ *
+ * @param
+ * @throws IOException
+ */
+ private void scan() {
+ for (HoodieInstant instant : instants) {
+ try {
+ Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant,
lastSyncTs);
+ if (records.isPresent()) {
+ records.get().forEach(record -> processNextRecord(record));
+ }
+ } catch (Exception e) {
+ LOG.error(String.format("Got exception when processing timeline
instant %s", instant.getTimestamp()), e);
+ throw new HoodieException(String.format("Got exception when processing
timeline instant %s", instant.getTimestamp()), e);
+ }
+ }
+ }
+
+ /**
+ * Process metadata table record by merging with existing record if it is a
part of the key filter.
+ *
+ * @param hoodieRecord
+ */
+ private void processNextRecord(HoodieRecord<? extends HoodieRecordPayload>
hoodieRecord) {
+ String key = hoodieRecord.getRecordKey();
+ if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) {
+ if (records.containsKey(key)) {
+ // Merge and store the merged record
+ HoodieRecordPayload combinedValue =
hoodieRecord.getData().preCombine(records.get(key).getData());
+ records.put(key, new HoodieRecord<>(new HoodieKey(key,
hoodieRecord.getPartitionPath()), combinedValue));
+ } else {
+ // Put the record as is
+ records.put(key, hoodieRecord);
+ }
+ }
+ }
+
+ /**
+ * Retrieve merged hoodie record for given key.
+ *
+ * @param key of the record to retrieve
+ * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
+ */
+ public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key) {
+ return Option.ofNullable((HoodieRecord) records.get(key));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
new file mode 100644
index 0000000..9a25825
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+
+/**
+ * A utility to convert timeline information to metadata table records.
+ */
+public class HoodieTableMetadataUtil {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieTableMetadataUtil.class);
+
+ /**
+ * Converts a timeline instant to metadata table records.
+ *
+ * @param datasetMetaClient The meta client associated with the timeline
instant
+ * @param instant to fetch and convert to metadata table records
+ * @return a list of metadata table records
+ * @throws IOException
+ */
+ public static Option<List<HoodieRecord>>
convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient,
HoodieInstant instant, Option<String> lastSyncTs) throws IOException {
+ HoodieTimeline timeline = datasetMetaClient.getActiveTimeline();
+ Option<List<HoodieRecord>> records = Option.empty();
+ ValidationUtils.checkArgument(instant.isCompleted(), "Only completed
instants can be synced.");
+
+ switch (instant.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION:
+ HoodieCleanMetadata cleanMetadata =
CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
+ records = Option.of(convertMetadataToRecords(cleanMetadata,
instant.getTimestamp()));
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.COMPACTION_ACTION:
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+ timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ records = Option.of(convertMetadataToRecords(commitMetadata,
instant.getTimestamp()));
+ break;
+ case HoodieTimeline.ROLLBACK_ACTION:
+ HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+ timeline.getInstantDetails(instant).get());
+ records = Option.of(convertMetadataToRecords(rollbackMetadata,
instant.getTimestamp(), lastSyncTs));
+ break;
+ case HoodieTimeline.RESTORE_ACTION:
+ HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+ timeline.getInstantDetails(instant).get());
+ records = Option.of(convertMetadataToRecords(restoreMetadata,
instant.getTimestamp(), lastSyncTs));
+ break;
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ // Nothing to be done here
+ break;
+ default:
+ throw new HoodieException("Unknown type of action " +
instant.getAction());
+ }
+
+ return records;
+ }
+
+ /**
+ * Finds all new files/partitions created as part of commit and creates
metadata table records for them.
+ *
+ * @param commitMetadata
+ * @param instantTime
+ * @return a list of metadata table records
+ */
+ public static List<HoodieRecord>
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String
instantTime) {
+
+ List<HoodieRecord> records = new LinkedList<>();
+ List<String> allPartitions = new LinkedList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
+ final String partition = partitionStatName.equals("") ?
NON_PARTITIONED_NAME : partitionStatName;
+ allPartitions.add(partition);
+
+ Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+ writeStats.forEach(hoodieWriteStat -> {
+ String pathWithPartition = hoodieWriteStat.getPath();
+ if (pathWithPartition == null) {
+ // Empty partition
+ LOG.warn("Unable to find path in write stat to update metadata table
" + hoodieWriteStat);
+ return;
+ }
+
+ int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 :
partition.length() + 1;
+ String filename = pathWithPartition.substring(offset);
+ ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate
files in HoodieCommitMetadata");
+ newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
+ });
+
+ // New files added to a partition
+ HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
+ partition, Option.of(newFiles), Option.empty());
+ records.add(record);
+ });
+
+ // New partitions created
+ HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new
ArrayList<>(allPartitions));
+ records.add(record);
+
+ LOG.info("Updating at " + instantTime + " from Commit/" +
commitMetadata.getOperationType()
+ + ". #partitions_updated=" + records.size());
+ return records;
+ }
+
+ /**
+ * Finds all files that will be deleted as part of a planned clean and
creates metadata table records for them.
+ *
+ * @param cleanerPlan from timeline to convert
+ * @param instantTime
+ * @return a list of metadata table records
+ */
+ public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanerPlan
cleanerPlan, String instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+
+ int[] fileDeleteCount = {0};
+ cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition,
deletedPathInfo) -> {
+ fileDeleteCount[0] += deletedPathInfo.size();
+
+ // Files deleted from a partition
+ List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new
Path(p.getFilePath()).getName())
+ .collect(Collectors.toList());
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+ Option.of(deletedFilenames));
+ records.add(record);
+ });
+
+ LOG.info("Found at " + instantTime + " from CleanerPlan.
#partitions_updated=" + records.size()
+ + ", #files_deleted=" + fileDeleteCount[0]);
+
+ return records;
+ }
+
+ /**
+ * Finds all files that were deleted as part of a clean and creates metadata
table records for them.
+ *
+ * @param cleanMetadata
+ * @param instantTime
+ * @return a list of metadata table records
+ */
+ public static List<HoodieRecord>
convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime)
{
+ List<HoodieRecord> records = new LinkedList<>();
+ int[] fileDeleteCount = {0};
+
+ cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
+ // Files deleted from a partition
+ List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+ Option.of(new ArrayList<>(deletedFiles)));
+
+ records.add(record);
+ fileDeleteCount[0] += deletedFiles.size();
+ });
+
+ LOG.info("Found at " + instantTime + " from Clean. #partitions_updated=" +
records.size()
+ + ", #files_deleted=" + fileDeleteCount[0]);
+
+ return records;
+ }
+
+ /**
+ * Aggregates all files deleted and appended to from all rollbacks
associated with a restore operation then
+ * creates metadata table records for them.
+ *
+ * @param restoreMetadata
+ * @param instantTime
+ * @return a list of metadata table records
+ */
+ public static List<HoodieRecord>
convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String
instantTime, Option<String> lastSyncTs) {
+ Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+ Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+ restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
+ rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles,
partitionToAppendedFiles, lastSyncTs));
+ });
+
+
+ return convertFilesToRecords(partitionToDeletedFiles,
partitionToAppendedFiles, instantTime, "Restore");
+ }
+
+ public static List<HoodieRecord>
convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String
instantTime, Option<String> lastSyncTs) {
+
+ Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+ Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+ processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles,
partitionToAppendedFiles, lastSyncTs);
+ return convertFilesToRecords(partitionToDeletedFiles,
partitionToAppendedFiles, instantTime, "Rollback");
+ }
+
+ /**
+ * Extracts information about the deleted and append files from the {@code
HoodieRollbackMetadata}.
+ *
+ * During a rollback files may be deleted (COW, MOR) or rollback blocks be
appended (MOR only) to files. This
+ * function will extract this change file for each partition.
+ *
+ * @param rollbackMetadata {@code HoodieRollbackMetadata}
+ * @param partitionToDeletedFiles The {@code Map} to fill with files deleted
per partition.
+ * @param partitionToAppendedFiles The {@code Map} to fill with files
appended per partition and their sizes.
+ */
+ private static void processRollbackMetadata(HoodieRollbackMetadata
rollbackMetadata,
+ Map<String, List<String>>
partitionToDeletedFiles,
+ Map<String, Map<String, Long>>
partitionToAppendedFiles,
+ Option<String> lastSyncTs) {
+
+ rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
+ // If commit being rolled back has not been synced to metadata table yet
then there is no need to update metadata
+ if (lastSyncTs.isPresent() &&
HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0),
HoodieTimeline.GREATER_THAN, lastSyncTs.get())) {
+ return;
+ }
+
+ final String partition = pm.getPartitionPath();
+ if (!pm.getSuccessDeleteFiles().isEmpty()) {
+ if (!partitionToDeletedFiles.containsKey(partition)) {
+ partitionToDeletedFiles.put(partition, new ArrayList<>());
+ }
+
+ // Extract deleted file name from the absolute paths saved in
getSuccessDeleteFiles()
+ List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p
-> new Path(p).getName())
+ .collect(Collectors.toList());
+ partitionToDeletedFiles.get(partition).addAll(deletedFiles);
+ }
+
+ if (!pm.getAppendFiles().isEmpty()) {
+ if (!partitionToAppendedFiles.containsKey(partition)) {
+ partitionToAppendedFiles.put(partition, new HashMap<>());
+ }
+
+ // Extract appended file name from the absolute paths saved in
getAppendFiles()
+ pm.getAppendFiles().forEach((path, size) -> {
+ partitionToAppendedFiles.get(partition).merge(new
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+ return size + oldSize;
+ });
+ });
+ }
+ });
+ }
+
+ private static List<HoodieRecord> convertFilesToRecords(Map<String,
List<String>> partitionToDeletedFiles,
+ Map<String, Map<String,
Long>> partitionToAppendedFiles, String instantTime,
+ String operation) {
+ List<HoodieRecord> records = new LinkedList<>();
+ int[] fileChangeCount = {0, 0}; // deletes, appends
+
+ partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
+ fileChangeCount[0] += deletedFiles.size();
+
+ Option<Map<String, Long>> filesAdded = Option.empty();
+ if (partitionToAppendedFiles.containsKey(partition)) {
+ filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
+ }
+
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
+ Option.of(new ArrayList<>(deletedFiles)));
+ records.add(record);
+ });
+
+ partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
+ fileChangeCount[1] += appendedFileMap.size();
+
+ // Validate that no appended file has been deleted
+ ValidationUtils.checkState(
+
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition,
Collections.emptyList())),
+ "Rollback file cannot both be appended and deleted");
+
+ // New files added to a partition
+ HoodieRecord record =
HoodieMetadataPayload.createPartitionFilesRecord(partition,
Option.of(appendedFileMap),
+ Option.empty());
+ records.add(record);
+ });
+
+ LOG.info("Found at " + instantTime + " from " + operation + ".
#partitions_updated=" + records.size()
+ + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended="
+ fileChangeCount[1]);
+
+ return records;
+ }
+
+}