This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 11e64b2 [HUDI-1717] Metadata Reader should merge all the un-synced
but complete instants from the dataset timeline. (#3082)
11e64b2 is described below
commit 11e64b2db0ddf8f816561f8442b373de15a26d71
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Jun 22 08:52:18 2021 -0700
[HUDI-1717] Metadata Reader should merge all the un-synced but complete
instants from the dataset timeline. (#3082)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 4 +--
.../hudi/metadata/HoodieTableMetadataWriter.java | 6 ++++
.../SparkHoodieBackedTableMetadataWriter.java | 19 +++++++++++
.../hudi/metadata/TestHoodieBackedMetadata.java | 38 ++++++++++++++++++++++
.../apache/hudi/metadata/BaseTableMetadata.java | 34 +++++++++++++++++--
.../hudi/metadata/HoodieBackedTableMetadata.java | 35 ++++++++++----------
6 files changed, 114 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 38a0447..034465d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -400,7 +400,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// (re) init the metadata for reading.
initTableMetadata();
try {
- List<HoodieInstant> instantsToSync = metadata.findInstantsToSync();
+ List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
if (instantsToSync.isEmpty()) {
return;
}
@@ -411,7 +411,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
metadata.getSyncedInstantTime());
+ Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
if (records.isPresent()) {
commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 02c5b9e..1b02a3b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.Option;
import java.io.Serializable;
@@ -40,4 +41,9 @@ public interface HoodieTableMetadataWriter extends
Serializable, AutoCloseable {
void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
+
+ /**
+ * Return the timestamp of the latest instant synced to the metadata table.
+ */
+ Option<String> getLatestSyncedInstantTime();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7c12a9e..c014e8b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -133,6 +135,23 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
}
/**
+ * Return the timestamp of the latest instant synced.
+ *
+ * To sync a instant on dataset, we create a corresponding delta-commit on
the metadata table. So return the latest
+ * delta-commit.
+ */
+ @Override
+ public Option<String> getLatestSyncedInstantTime() {
+ if (!enabled) {
+ return Option.empty();
+ }
+
+ HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
+ return timeline.getDeltaCommitTimeline().filterCompletedInstants()
+ .lastInstant().map(HoodieInstant::getTimestamp);
+ }
+
+ /**
* Tag each record with the location.
*
* Since we only read the latest base file in a partition, we tag the
records with the instant time of the latest
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index ab731f5..79ccd37 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -491,6 +491,8 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
// Various table operations without metadata table enabled
String restoreToInstant;
+ String inflightActionTimestamp;
+ String beforeInflightActionTimestamp;
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, false))) {
// updates
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -523,6 +525,10 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
assertTrue(metadata(client).isInSync());
}
+ // Record a timestamp for creating an inflight instance for sync testing
+ inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
+ beforeInflightActionTimestamp = newCommitTime;
+
// Deletes
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
records = dataGen.generateDeletes(newCommitTime, 5);
@@ -554,9 +560,41 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
assertTrue(metadata(client).isInSync());
}
+ // If there is an incomplete operation, the Metadata Table is not updated
beyond that operations but the
+ // in-memory merge should consider all the completed operations.
+ Path inflightCleanPath = new Path(metaClient.getMetaPath(),
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
+ fs.create(inflightCleanPath).close();
+
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
// Restore cannot be done until the metadata table is in sync. See
HUDI-1502 for details
client.syncTableMetadata();
+
+ // Table should sync only before the inflightActionTimestamp
+ HoodieBackedTableMetadataWriter writer =
+
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
client.getConfig(), context);
+ assertEquals(writer.getLatestSyncedInstantTime().get(),
beforeInflightActionTimestamp);
+
+ // Reader should sync to all the completed instants
+ HoodieTableMetadata metadata = HoodieTableMetadata.create(context,
client.getConfig().getMetadataConfig(),
+ client.getConfig().getBasePath(),
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
+ assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
+
+ // Remove the inflight instance holding back table sync
+ fs.delete(inflightCleanPath, false);
+ client.syncTableMetadata();
+
+ writer =
+
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
client.getConfig(), context);
+ assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime);
+
+ // Reader should sync to all the completed instants
+ metadata = HoodieTableMetadata.create(context,
client.getConfig().getMetadataConfig(),
+ client.getConfig().getBasePath(),
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
+ assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
+ }
+
+ // Enable metadata table and ensure it is synced
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
client.restoreToInstant(restoreToInstant);
assertFalse(metadata(client).isInSync());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 0092853..85a4d69 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -62,6 +62,7 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
protected final HoodieMetadataConfig metadataConfig;
// Directory used for Spillable Map when merging records
protected final String spillableMapDirectory;
+ private String syncedInstantTime;
protected boolean enabled;
private TimelineMergedTableMetadata timelineMergedMetadata;
@@ -277,17 +278,44 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
private void openTimelineScanner() {
if (timelineMergedMetadata == null) {
- List<HoodieInstant> unSyncedInstants = findInstantsToSync();
+ List<HoodieInstant> unSyncedInstants = findInstantsToSyncForReader();
timelineMergedMetadata =
new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants,
getSyncedInstantTime(), null);
+
+ syncedInstantTime = unSyncedInstants.isEmpty() ?
getLatestDatasetInstantTime()
+ : unSyncedInstants.get(unSyncedInstants.size() - 1).getTimestamp();
+ }
+ }
+
+ /**
+ * Return the timestamp of the latest synced instant.
+ */
+ @Override
+ public Option<String> getSyncedInstantTime() {
+ if (!enabled) {
+ return Option.empty();
}
+
+ return Option.ofNullable(syncedInstantTime);
}
- protected abstract List<HoodieInstant> findInstantsToSync();
+ /**
+ * Return the instants which are not-synced to the {@code
HoodieTableMetadata}.
+ *
+ * This is the list of all completed but un-synched instants.
+ */
+ protected abstract List<HoodieInstant> findInstantsToSyncForReader();
+
+ /**
+ * Return the instants which are not-synced to the {@code
HoodieTableMetadataWriter}.
+ *
+ * This is the list of all completed but un-synched instants which do not
have any incomplete instants in between them.
+ */
+ protected abstract List<HoodieInstant> findInstantsToSyncForWriter();
@Override
public boolean isInSync() {
- return enabled && findInstantsToSync().isEmpty();
+ return enabled && findInstantsToSyncForWriter().isEmpty();
}
protected HoodieEngineContext getEngineContext() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 66e8f41..f374d61 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -265,7 +264,22 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
* Return an ordered list of instants which have not been synced to the
Metadata Table.
*/
@Override
- protected List<HoodieInstant> findInstantsToSync() {
+ protected List<HoodieInstant> findInstantsToSyncForReader() {
+ return findInstantsToSync(true);
+ }
+
+ /**
+ * Return an ordered list of instants which have not been synced to the
Metadata Table.
+ */
+ @Override
+ protected List<HoodieInstant> findInstantsToSyncForWriter() {
+ return findInstantsToSync(false);
+ }
+
+ /**
+ * Return an ordered list of instants which have not been synced to the
Metadata Table.
+ */
+ private List<HoodieInstant> findInstantsToSync(boolean
ignoreIncompleteInstants) {
initIfNeeded();
// if there are no instants yet, return empty list, since there is nothing
to sync here.
@@ -277,7 +291,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// are candidates for sync.
String latestMetadataInstantTime =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
HoodieDefaultTimeline candidateTimeline =
datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime,
Integer.MAX_VALUE);
- Option<HoodieInstant> earliestIncompleteInstant =
candidateTimeline.filterInflightsAndRequested().firstInstant();
+ Option<HoodieInstant> earliestIncompleteInstant = ignoreIncompleteInstants
? Option.empty()
+ : candidateTimeline.filterInflightsAndRequested().firstInstant();
if (earliestIncompleteInstant.isPresent()) {
return candidateTimeline.filterCompletedInstants()
@@ -289,20 +304,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
}
- /**
- * Return the timestamp of the latest compaction instant.
- */
- @Override
- public Option<String> getSyncedInstantTime() {
- if (!enabled) {
- return Option.empty();
- }
-
- HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
- return timeline.getDeltaCommitTimeline().filterCompletedInstants()
- .lastInstant().map(HoodieInstant::getTimestamp);
- }
-
public boolean enabled() {
return enabled;
}