[
https://issues.apache.org/jira/browse/HUDI-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408373#comment-17408373
]
ASF GitHub Bot commented on HUDI-2285:
--------------------------------------
vinothchandar commented on a change in pull request #3426:
URL: https://github.com/apache/hudi/pull/3426#discussion_r700518633
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -187,6 +188,7 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
lastCompletedTxnAndMetadata.isPresent() ?
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(instantTime, metadata);
+ table.getMetadataWriter().ifPresent(w ->
((HoodieTableMetadataWriter)w).update(metadata, instantTime));
Review comment:
nts: first committing to metadata table
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -110,23 +119,31 @@ protected HoodieBackedTableMetadataWriter(Configuration
hadoopConf, HoodieWriteC
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
initRegistry();
- HoodieTableMetaClient datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
- initialize(engineContext, datasetMetaClient);
- if (enabled) {
- // This is always called even in case the table was created for the
first time. This is because
- // initFromFilesystem() does file listing and hence may take a long
time during which some new updates
- // may have occurred on the table. Hence, calling this always ensures
that the metadata is brought in sync
- // with the active timeline.
- HoodieTimer timer = new HoodieTimer().startTimer();
- syncFromInstants(datasetMetaClient);
- metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR,
timer.endTimer()));
- }
+ this.datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
Review comment:
nts: loading this afresh here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -396,37 +408,56 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize shards for a partition.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
+
+ final String newFileId = FSUtils.createNewFileIdPfx();
+ final String newFileIdPrefix = newFileId.substring(0, 32);
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
+
+ LOG.info(String.format("Creating %d shards for partition %s with base
fileId %s at instant time %s",
+ shardCount, partition, newFileId, instantTime));
+ for (int i = 0; i < shardCount; ++i) {
+ // Generate a indexed fileId for each shard and write a log block into
it to create the file.
+ final String shardFileId = String.format("%s%04d", newFileIdPrefix, i +
1);
+ ValidationUtils.checkArgument(newFileId.length() ==
shardFileId.length(), "FileId should be of length " + newFileId.length());
+ try {
+ HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
partition))
+ .withFileId(shardFileId).overBaseCommit(instantTime)
+ .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+ .withFileSize(0L)
+ .withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
+ .withFs(datasetMetaClient.getFs())
+ .withRolloverLogWriteToken(FSUtils.makeWriteToken(0, 0, 0))
+ .withLogWriteToken(FSUtils.makeWriteToken(0, 0, 0))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ writer.appendBlock(block);
+ writer.close();
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to created record level index shard " +
shardFileId, e);
}
- initTableMetadata();
- } catch (IOException ioe) {
- throw new HoodieIOException("Unable to sync instants from data to
metadata table.", ioe);
}
}
+ protected String getShardFileName(String fileId, int shardIndex) {
+ ValidationUtils.checkArgument(shardIndex <= 9999, "Maximum 9999 shards are
supported.");
Review comment:
`9999`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -396,37 +408,56 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize shards for a partition.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
+
+ final String newFileId = FSUtils.createNewFileIdPfx();
+ final String newFileIdPrefix = newFileId.substring(0, 32);
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
+
+ LOG.info(String.format("Creating %d shards for partition %s with base
fileId %s at instant time %s",
+ shardCount, partition, newFileId, instantTime));
+ for (int i = 0; i < shardCount; ++i) {
+ // Generate a indexed fileId for each shard and write a log block into
it to create the file.
+ final String shardFileId = String.format("%s%04d", newFileIdPrefix, i +
1);
+ ValidationUtils.checkArgument(newFileId.length() ==
shardFileId.length(), "FileId should be of length " + newFileId.length());
+ try {
+ HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
partition))
+ .withFileId(shardFileId).overBaseCommit(instantTime)
+ .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+ .withFileSize(0L)
Review comment:
legit?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +70,20 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ if (!config.useFileListingMetadata()) {
+ return Option.empty();
+ }
+
+ try {
+ if (!metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
Review comment:
could we avoid this `exists()` somehow?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -435,7 +425,6 @@ protected void postCommit(HoodieTable<T, I, K, O> table,
HoodieCommitMetadata me
HoodieTimelineArchiveLog archiveLog = new
HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit();
- syncTableMetadata();
Review comment:
there is no more additional sync process, with this re-design.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -73,14 +73,13 @@
// Metadata table's timeline and metaclient
private HoodieTableMetaClient metaClient;
private HoodieTableConfig tableConfig;
- private List<FileSlice> latestFileSystemMetadataSlices;
// should we reuse the open file handles, across calls
private final boolean reuse;
-
- // Readers for the base and log file which store the metadata
- private transient HoodieFileReader<GenericRecord> baseFileReader;
- private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
+ // Shards for each partition
+ private Map<String, List<FileSlice>> partitionToShardsMap;
Review comment:
file groups?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -86,10 +93,12 @@
protected HoodieBackedTableMetadata metadata;
protected HoodieTableMetaClient metaClient;
+ protected HoodieTableMetaClient datasetMetaClient;
protected Option<HoodieMetadataMetrics> metrics;
protected boolean enabled;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
+ protected TransactionManager txnManager;
Review comment:
nts: this is so that metadata table itself can take multiple writers
committing at the same time.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -464,8 +487,14 @@ public void update(HoodieCleanerPlan cleanerPlan, String
instantTime) {
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
if (enabled) {
- List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime);
- commit(records, MetadataPartitionType.FILES.partitionPath(),
instantTime);
+ this.txnManager.beginTransaction(Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)),
Review comment:
can we share this code across these update() method overload
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -396,37 +408,56 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize shards for a partition.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
Review comment:
add a constant for `9999`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -399,14 +397,6 @@ protected void preWrite(String instantTime,
WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
- this.txnManager.beginTransaction(Option.of(new
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)),
lastCompletedTxnAndMetadata
Review comment:
nts: this lock was only being taken for purposes of syncing. So removing
this is fine.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -304,6 +315,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext
engineContext, Hoodi
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
initTableMetadata();
+ initializeShards(datasetMetaClient,
MetadataPartitionType.FILES.partitionPath(), createInstantTime, 1);
Review comment:
we should try to avoid introducing the new shard terminology. We should
have it for bucketing, if thats what we intend it for.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -492,8 +527,26 @@ public void update(HoodieRestoreMetadata restoreMetadata,
String instantTime) {
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
if (enabled) {
- List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime,
metadata.getSyncedInstantTime());
- commit(records, MetadataPartitionType.FILES.partitionPath(),
instantTime);
+ this.txnManager.beginTransaction(Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)),
+ Option.empty());
+ try {
+ // Is this rollback of an instant that has been synced to the metadata
table?
+ String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
+ boolean wasSynced = metaClient.getActiveTimeline().containsInstant(new
HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
+ if (!wasSynced) {
+ // A compaction may have taken place on metadata table which would
have included this instant being rolled back.
Review comment:
any scope for simplifyinf this?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -32,18 +36,36 @@
*/
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable
{
- void update(HoodieCommitMetadata commitMetadata, String instantTime);
-
- void update(HoodieCleanerPlan cleanerPlan, String instantTime);
+ // Update the metadata table due to a COMMIT operation
+ void update(HoodieCommitMetadata option, String instantTime);
+ // Update the metadata table due to a CLEAN operation
void update(HoodieCleanMetadata cleanMetadata, String instantTime);
+ // Update the metadata table due to a RESTORE operation
void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
+ // Update the metadata table due to a ROLLBACK operation
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
/**
* Return the timestamp of the latest instant synced to the metadata table.
*/
Option<String> getLatestSyncedInstantTime();
+
+ /**
+ * Remove the metadata table for the dataset.
+ *
+ * @param basePath base path of the dataset
+ * @param context
+ */
+ static void removeMetadataTable(String basePath, HoodieEngineContext
context) {
Review comment:
rename `deleteMetadataTable`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -396,37 +408,56 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize shards for a partition.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
+
+ final String newFileId = FSUtils.createNewFileIdPfx();
+ final String newFileIdPrefix = newFileId.substring(0, 32);
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
Review comment:
we are writing an empty delete block? why? just to create the file
group/shard upfront?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -32,18 +36,36 @@
*/
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable
{
- void update(HoodieCommitMetadata commitMetadata, String instantTime);
-
- void update(HoodieCleanerPlan cleanerPlan, String instantTime);
+ // Update the metadata table due to a COMMIT operation
+ void update(HoodieCommitMetadata option, String instantTime);
Review comment:
why `option`?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -299,6 +302,7 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction");
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, writeStats);
+ table.getMetadataWriter().ifPresent(w -> w.update(metadata,
compactionCommitTime));
Review comment:
nts: audit all paths that commit to timeline and ensure this is done
everywhere. this may also be an opportunity to streamline such code occurrences.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -200,20 +200,19 @@ public boolean archiveIfRequired(HoodieEngineContext
context) throws IOException
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));
- // If metadata table is enabled, do not archive instants which are more
recent that the latest synced
- // instant on the metadata table. This is required for metadata table sync.
+ // If metadata table is enabled, do not archive instants which are more
recent that the last compaction on the
+ // metadata table.
if (config.useFileListingMetadata()) {
try (HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
config.getBasePath(),
FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) {
- Option<String> lastSyncedInstantTime =
tableMetadata.getSyncedInstantTime();
-
- if (lastSyncedInstantTime.isPresent()) {
- LOG.info("Limiting archiving of instants to last synced instant on
metadata table at " + lastSyncedInstantTime.get());
- instants = instants.filter(i ->
HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
- lastSyncedInstantTime.get()));
- } else {
- LOG.info("Not archiving as there is no instants yet on the metadata
table");
+ Option<String> latestCompactionTime =
tableMetadata.getLatestCompactionTime();
Review comment:
Need to understand why it matters that metadata table be compacted
before it commit can be be archived on the data timelien
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -152,49 +134,57 @@ protected void commit(List<HoodieRecord> records, String
partitionName, String i
}
/**
- * Tag each record with the location.
+ * Perform a compaction on the Metadata Table.
*
- * Since we only read the latest base file in a partition, we tag the
records with the instant time of the latest
- * base file.
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
+ *
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName) {
- HoodieTable table = HoodieSparkTable.create(metadataWriteConfig,
engineContext);
- TableFileSystemView.SliceView fsView = table.getSliceView();
- List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
- .map(FileSlice::getBaseFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
-
- // All the metadata fits within a single base file
- if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
- if (baseFiles.size() > 1) {
- throw new HoodieMetadataException("Multiple base files found in
metadata partition");
- }
+ private void compactIfNecessary(SparkRDDWriteClient writeClient, String
instantTime) {
+ String latestDeltacommitTime =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+ .get().getTimestamp();
+ List<HoodieInstant> pendingInstants =
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+ if (!pendingInstants.isEmpty()) {
+ LOG.info(String.format("Cannot compact metadata table as there are %d
inflight instants before latest deltacommit %s: %s",
+ pendingInstants.size(), latestDeltacommitTime,
Arrays.toString(pendingInstants.toArray())));
+ return;
}
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- String fileId;
- String instantTime;
- if (!baseFiles.isEmpty()) {
- fileId = baseFiles.get(0).getFileId();
- instantTime = baseFiles.get(0).getCommitTime();
- } else {
- // If there is a log file then we can assume that it has the data
- List<HoodieLogFile> logFiles =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
- .map(FileSlice::getLatestLogFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
- if (logFiles.isEmpty()) {
- // No base and log files. All are new inserts
- return jsc.parallelize(records, 1);
- }
-
- fileId = logFiles.get(0).getFileId();
- instantTime = logFiles.get(0).getBaseCommitTime();
+ // Trigger compaction with suffixes based on the same instant time. This
ensures that any future
+ // delta commits synced over will not have an instant time lesser than the
last completed instant on the
+ // metadata table.
+ final String compactionInstantTime = latestDeltacommitTime + "001";
+ if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ writeClient.compact(compactionInstantTime);
}
+ }
+
+ private void cleanIfNecessary(SparkRDDWriteClient writeClient, String
instantTime) {
Review comment:
why the `IfNecessary` part?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -96,6 +94,11 @@ public SparkRDDWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeC
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig
writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
+ if (config.useFileListingMetadata()) {
Review comment:
should we use a flag for metadata table enable/disable ? not just file
listing
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+/**
+ * Upgrade handle to assist in upgrading hoodie table from version 1 to 2.
+ */
+public class OneToTwoUpgradeHandler implements UpgradeHandler {
Review comment:
I thiink this will now be TwoToThree
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -101,7 +94,7 @@ protected void initialize(HoodieEngineContext engineContext,
HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
Review comment:
nts: this cannot be a List<> for the record level index
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java
##########
@@ -48,7 +48,9 @@ public void run(HoodieTableMetaClient metaClient,
@Override
protected void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion
toVersion, String instantTime) {
- if (fromVersion == HoodieTableVersion.ZERO && toVersion ==
HoodieTableVersion.ONE) {
+ if (fromVersion == HoodieTableVersion.ONE && toVersion ==
HoodieTableVersion.TWO) {
Review comment:
there are some conflicts to be resolved here.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -40,17 +40,10 @@
// Enable the internal Metadata Table which saves file listings
public static final ConfigProperty<Boolean> METADATA_ENABLE_PROP =
ConfigProperty
.key(METADATA_PREFIX + ".enable")
- .defaultValue(false)
+ .defaultValue(true)
Review comment:
so all tests pass now?!!!
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -229,116 +196,11 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
statuses =
hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
}
- if (metadataConfig.validateFileListingMetadata()) {
Review comment:
yay!
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -331,4 +301,28 @@ public HoodieTableMetaClient getMetaClient() {
public Map<String, String> stats() {
return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new
HashMap<>());
}
+
+ @Override
+ public Option<String> getSyncedInstantTime() {
Review comment:
these methods are the same!
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -340,4 +297,77 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
return records;
}
+
+ /**
+ * Returns a list of commits which were rolled back as part of a Rollback or
Restore operation.
+ *
+ * @param instant The Rollback operation to read
+ * @param timeline
+ */
+ public static List<String> getCommitsRolledback(HoodieInstant instant,
HoodieActiveTimeline timeline) {
Review comment:
this should belong some place else?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -223,11 +174,17 @@
return convertFilesToRecords(partitionToDeletedFiles,
partitionToAppendedFiles, instantTime, "Restore");
}
- public static List<HoodieRecord>
convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String
instantTime, Option<String> lastSyncTs) {
+ public static List<HoodieRecord>
convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String
instantTime,
+ Option<String> lastSyncTs, boolean wasSynced) {
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles,
partitionToAppendedFiles, lastSyncTs);
+ if (!wasSynced) {
Review comment:
nts: need to revisit again with rollback/restore issues fixed
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
Review comment:
nts: ensure all the reuse of these readers for timeline server etc is
working really well
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -340,4 +297,77 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
return records;
}
+
+ /**
+ * Returns a list of commits which were rolled back as part of a Rollback or
Restore operation.
+ *
+ * @param instant The Rollback operation to read
+ * @param timeline
+ */
+ public static List<String> getCommitsRolledback(HoodieInstant instant,
HoodieActiveTimeline timeline) {
+ try {
+ if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
+ HoodieRollbackMetadata rollbackMetadata =
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+ timeline.getInstantDetails(instant).get());
+ return rollbackMetadata.getCommitsRollback();
+ }
+
+ List<String> commitsRolledback = new LinkedList<>();
+
+ if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) {
+ // Restore is made up of several rollbacks
+ HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+ timeline.getInstantDetails(instant).get());
+ restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
+ rms.forEach(rm -> commitsRolledback.addAll(rm.getCommitsRollback()));
+ });
+ }
+
+ return commitsRolledback;
+ } catch (IOException e) {
+ throw new HoodieMetadataException("Error retrieving rollback commits for
instant " + instant, e);
+ }
+ }
+
+ /**
+ * Map a key to a shard.
+ *
+ * Note: For hashing, the algorithm is same as String.hashCode() but is
being defined here as hashCode()
+ * implementation is not guaranteed by the JVM to be consistent across JVM
versions and implementations.
+ *
+ * @param str
+ * @return An integer hash of the given string
+ */
+ public static int keyToShard(String str, int numShards) {
+ int h = 0;
+ for (int i = 0; i < str.length(); ++i) {
+ h = 31 * h + str.charAt(i);
+ }
+
+ return Math.abs(Math.abs(h) % numShards);
+ }
+
+ /**
+ * Loads the list of shards for a partition of the Metadata Table.
+ *
+ * The list of shards is returned sorted in the correct order of shard index.
+ * @param metaClient
+ * @param partition The name of the partition whose shards are to be loaded.
+ * @return List of shards
+ */
+ public static List<FileSlice> loadPartitionShards(HoodieTableMetaClient
metaClient, String partition) {
Review comment:
lets streamline all this naming.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
+ return shardReaders.computeIfAbsent(partitionName, k -> {
+ try {
+ final long baseFileOpenMs;
+ final long logScannerOpenMs;
+ HoodieFileReader baseFileReader = null;
+ HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+ // Metadata is in sync till the latest completed instant on the dataset
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<FileSlice> shards =
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+ ValidationUtils.checkArgument(shards.size() == 1,
String.format("Invalid number of shards: found=%d, required=%d", shards.size(),
1));
+ final FileSlice slice =
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+ // If the base file is present then create a reader
+ Option<HoodieBaseFile> basefile = slice.getBaseFile();
+ if (basefile.isPresent()) {
Review comment:
do we send initial data to log files? without any base? is this why we
are creating the log files with empty delete block upfront?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -141,13 +141,14 @@ protected void commit(Option<Map<String, String>>
extraMetadata, HoodieWriteMeta
result.setWriteStats(writeStats);
// Finalize write
finalizeWrite(instantTime, writeStats, result);
- syncTableMetadata();
try {
LOG.info("Committing " + instantTime + ", action Type " +
getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats,
result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(),
getCommitActionType());
+ syncTableMetadata(metadata);
Review comment:
For Flink, this code is still executed at the driver, right?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java
##########
@@ -43,7 +43,9 @@ public void run(HoodieTableMetaClient metaClient,
HoodieTableVersion toVersion,
@Override
protected void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion
toVersion, String instantTime) {
- if (fromVersion == HoodieTableVersion.ZERO && toVersion ==
HoodieTableVersion.ONE) {
+ if (fromVersion == HoodieTableVersion.ONE && toVersion ==
HoodieTableVersion.TWO) {
+ // TODO:
Review comment:
+1
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -381,6 +387,57 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
return partitionToFileStatus;
}
+ /**
+ * Initialize shards for a partition.
+ *
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
Review comment:
if we use the 0000-9999 as a hash partition, then we cannot reuse that?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
##########
@@ -46,4 +50,24 @@ public BaseActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
}
public abstract R execute();
+
+ protected final void syncTableMetadata(HoodieCommitMetadata metadata) {
Review comment:
+1
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
+ return shardReaders.computeIfAbsent(partitionName, k -> {
+ try {
+ final long baseFileOpenMs;
+ final long logScannerOpenMs;
+ HoodieFileReader baseFileReader = null;
+ HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+ // Metadata is in sync till the latest completed instant on the dataset
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<FileSlice> shards =
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+ ValidationUtils.checkArgument(shards.size() == 1,
String.format("Invalid number of shards: found=%d, required=%d", shards.size(),
1));
+ final FileSlice slice =
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+ // If the base file is present then create a reader
+ Option<HoodieBaseFile> basefile = slice.getBaseFile();
+ if (basefile.isPresent()) {
+ String basefilePath = basefile.get().getPath();
+ baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+ baseFileOpenMs = timer.endTimer();
+ LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
+ basefile.get().getCommitTime(), baseFileOpenMs));
+ } else {
+ baseFileOpenMs = 0;
+ timer.endTimer();
+ }
- private void close(HoodieFileReader localFileReader,
HoodieMetadataMergedLogRecordScanner localLogScanner) {
- try {
- if (localFileReader != null) {
- localFileReader.close();
- }
- if (localLogScanner != null) {
- localLogScanner.close();
+ // Open the log record scanner using the log files from the latest
file slice
+ timer.startTimer();
+ List<String> logFilePaths = slice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(o -> o.getPath().toString())
+ .collect(Collectors.toList());
+
+ // Only those log files which have a corresponding completed instant
on the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ HoodieActiveTimeline datasetTimeline =
datasetMetaClient.getActiveTimeline();
+ Set<String> validInstantTimestamps =
datasetTimeline.filterCompletedInstants().getInstants()
+ .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+ // For any rollbacks and restores, we cannot neglect the instants that
they are rolling back.
+ // The rollback instant should be more recent than the start of the
timeline for it to have rolled back any
+ // instant which we have a log block for.
+ final String minInstantTime = validInstantTimestamps.isEmpty() ?
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN, minInstantTime))
+ .forEach(instant -> {
+
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
datasetTimeline));
+ });
+
+ // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid
timestamp
+ validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+ Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+ // Load the schema
+ Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+ HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
+ logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
+ .withFileSystem(metaClient.getFs())
+ .withBasePath(metadataBasePath)
+ .withLogFilePaths(logFilePaths)
+ .withReaderSchema(schema)
+ .withLatestInstantTime(latestMetaInstantTimestamp)
+ .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
+ .withBufferSize(BUFFER_SIZE)
+ .withSpillableMapBasePath(spillableMapDirectory)
+ .withDiskMapType(commonConfig.getSpillableDiskMapType())
+
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
+ .withLogBlockTimestamps(validInstantTimestamps)
Review comment:
this is what fences all uncommitted data from being read out of metadata
table
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
Review comment:
this method needs to be broken down
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -110,23 +119,31 @@ protected HoodieBackedTableMetadataWriter(Configuration
hadoopConf, HoodieWriteC
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
initRegistry();
- HoodieTableMetaClient datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
- initialize(engineContext, datasetMetaClient);
- if (enabled) {
- // This is always called even in case the table was created for the
first time. This is because
- // initFromFilesystem() does file listing and hence may take a long
time during which some new updates
- // may have occurred on the table. Hence, calling this always ensures
that the metadata is brought in sync
- // with the active timeline.
- HoodieTimer timer = new HoodieTimer().startTimer();
- syncFromInstants(datasetMetaClient);
- metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR,
timer.endTimer()));
- }
+ this.datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
+ initTransactionManager();
+ initialize(engineContext);
+ initTableMetadata();
} else {
enabled = false;
this.metrics = Option.empty();
}
}
+ /**
+ * Initialize the {@code TransactionManager} to use for metadata table.
+ *
+ * In HUDI multi writer mode, each operation will sync to metadata table
before completion. Metadata table has common
+ * base and log files to update for each operation. So we can only support
serialized operations.
+ */
+ private void initTransactionManager() {
+ // The lock location should be different from the dataset
+ Properties properties = new Properties();
+ properties.putAll(datasetWriteConfig.getProps());
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY,
properties.getProperty(FILESYSTEM_LOCK_PATH_PROP_KEY,
datasetWriteConfig.getBasePath() + "/.hoodie/.locks") + "/metadata");
Review comment:
+1
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -110,23 +119,31 @@ protected HoodieBackedTableMetadataWriter(Configuration
hadoopConf, HoodieWriteC
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
initRegistry();
- HoodieTableMetaClient datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
- initialize(engineContext, datasetMetaClient);
- if (enabled) {
- // This is always called even in case the table was created for the
first time. This is because
- // initFromFilesystem() does file listing and hence may take a long
time during which some new updates
- // may have occurred on the table. Hence, calling this always ensures
that the metadata is brought in sync
- // with the active timeline.
- HoodieTimer timer = new HoodieTimer().startTimer();
- syncFromInstants(datasetMetaClient);
- metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR,
timer.endTimer()));
- }
+ this.datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
+ initTransactionManager();
+ initialize(engineContext);
+ initTableMetadata();
} else {
enabled = false;
this.metrics = Option.empty();
}
}
+ /**
+ * Initialize the {@code TransactionManager} to use for metadata table.
+ *
+ * In HUDI multi writer mode, each operation will sync to metadata table
before completion. Metadata table has common
+ * base and log files to update for each operation. So we can only support
serialized operations.
+ */
+ private void initTransactionManager() {
+ // The lock location should be different from the dataset
+ Properties properties = new Properties();
+ properties.putAll(datasetWriteConfig.getProps());
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY,
properties.getProperty(FILESYSTEM_LOCK_PATH_PROP_KEY,
datasetWriteConfig.getBasePath() + "/.hoodie/.locks") + "/metadata");
Review comment:
filesystem based lock may not work on cloud storage. not sure if we can
assume this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Metadata Table Synchronous Design
> ---------------------------------
>
> Key: HUDI-2285
> URL: https://issues.apache.org/jira/browse/HUDI-2285
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: Prashant Wason
> Assignee: Prashant Wason
> Priority: Major
> Labels: pull-request-available
>
> h2. *Motivation*
> HUDI Metadata Table version 1 (0.7 release) supports file-listing
> optimization. We intend to add support for additional information -
> record-level index(UUID), column indexes (column range index) to the metadata
> table. This requires re-architecting the table design for large scale
> (50billion+ records), synchronous operations and to reduce the reader-side
> overhead.
> # Limit the amount of sync requirement on the reader side
> # Syncing on reader side may negate the benefits of the secondary index
> # Not syncing on the reader-side simplifies design and reduces testing
> # Allow moving to a multi-writer design with operations running in separate
> pipelines
> # E.g. Clustering / Clean / Backfills in separate pipelines
> # Ease of debugging
> # Scale - Should be able to handle very large inserts - millions of keys,
> thousands of datafiles written
>
> h3. *Writer Side*
> The lifecycle of a HUDI operation will be as listed below. The example below
> shows COMMIT operation but the steps apply for all types of operations.
> # SparkHoodieWriteClient.commit(...) is called by ingestion process at time
> T1
> # Create requested instant t1.commit.requested
> # Create inflight instant t1.commit.inflight
> # Perform the write of RDD into the dataset and create the
> HoodieCommitMetadata
> # HoodieMetadataTableWriter.update(CommitMetadata, t1, WriteStatus)
> # This will perform a delta-commit into the HUDI Metadata Table updating the
> file listing, record-level index (future) and column indexes (future)
> together from the data collected in the WriteStatus.
> # This commit will complete before the commit started on the dataset will
> complete.
> # This will create the t1.deltacommit on the Metadata Table.
> # Since Metadata Table has inline clean and inline compaction, those
> additional operations may also take place at this time
> # Complete the commit by creating t1.commit
> Inline-compaction will only compact those log blocks which can be deemed
> readable as per the algorithm described in the reader-side in the next
> section.
> h3. *Reader Side*
> # List the dataset to find all completed instants - e.g. t1.commit,
> t2.commit … t10.commit
> # Since these instants are completed, their related metadata has already
> been written to the metadata table as part of respective deltacommits -
> t1.deltacommit, t2.deltacommit … t10.deltacommit
> # Find the last completed instant on the dataset - t10.commit
> # Open the FileSlice on the metadata partition with the following
> constraints:
> # Any base file with time > t10 cannot be used
> # Any log blocks whose timestamp is not in the list of completed instants
> (#1 above) cannot be used
> # Only in ingestion failure cases the latest base file (created due to
> compaction) or some log blocks may have to be neglected. In success cases,
> this process should not add extra overhead except for listing the dataset.
>
> h3. *Multi Write Support*
> Since each operation on metadata table writes to the same files (file-listing
> partition has a single FileSlice), we can only allow single-writer access to
> the metadata table. For this, the Transaction Manager is used to lock the
> table before any updates.
> In essence, each multi-writer operation will contend for the same lock to
> write updates to the metadata table before the operation completes. This may
> not even be an issue in reality as the operations will complete at different
> times and the metadata table operations should be fast.
>
> *Upgrade/Downgrade*
> The two versions (current and this new one) differ in schema and its
> complicated to check whether the table is in sync. So its simpler to
> re-bootstrap as its only the file listing which needs to be re-bootstrapped.
> h3. *Support for shards in metadata table partitions.*
> 1. There will be fixed number of shards for each Metadata Table partition.
> 2. Shards are implemented using filenames of format fileId00ABCD where ABCD
> is the shard number. This allows easy identification of the files and their
> order while still keeping the names unique.
> 3. Shards are pre-allocation during the time of bootstrap.
> 4. Currently only files partition has 1 shard. But this code is required for
> record-level-index so implemented here.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)