[
https://issues.apache.org/jira/browse/HUDI-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17410724#comment-17410724
]
ASF GitHub Bot commented on HUDI-2285:
--------------------------------------
nsivabalan commented on a change in pull request #3426:
URL: https://github.com/apache/hudi/pull/3426#discussion_r703004302
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
+ return shardReaders.computeIfAbsent(partitionName, k -> {
+ try {
+ final long baseFileOpenMs;
+ final long logScannerOpenMs;
+ HoodieFileReader baseFileReader = null;
+ HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+ // Metadata is in sync till the latest completed instant on the dataset
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<FileSlice> shards =
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+ ValidationUtils.checkArgument(shards.size() == 1,
String.format("Invalid number of shards: found=%d, required=%d", shards.size(),
1));
+ final FileSlice slice =
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+ // If the base file is present then create a reader
+ Option<HoodieBaseFile> basefile = slice.getBaseFile();
+ if (basefile.isPresent()) {
+ String basefilePath = basefile.get().getPath();
+ baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+ baseFileOpenMs = timer.endTimer();
+ LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
+ basefile.get().getCommitTime(), baseFileOpenMs));
+ } else {
+ baseFileOpenMs = 0;
+ timer.endTimer();
+ }
- private void close(HoodieFileReader localFileReader,
HoodieMetadataMergedLogRecordScanner localLogScanner) {
- try {
- if (localFileReader != null) {
- localFileReader.close();
- }
- if (localLogScanner != null) {
- localLogScanner.close();
+ // Open the log record scanner using the log files from the latest
file slice
+ timer.startTimer();
+ List<String> logFilePaths = slice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(o -> o.getPath().toString())
+ .collect(Collectors.toList());
+
+ // Only those log files which have a corresponding completed instant
on the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ HoodieActiveTimeline datasetTimeline =
datasetMetaClient.getActiveTimeline();
+ Set<String> validInstantTimestamps =
datasetTimeline.filterCompletedInstants().getInstants()
+ .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+ // For any rollbacks and restores, we cannot neglect the instants that
they are rolling back.
+ // The rollback instant should be more recent than the start of the
timeline for it to have rolled back any
+ // instant which we have a log block for.
+ final String minInstantTime = validInstantTimestamps.isEmpty() ?
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN, minInstantTime))
+ .forEach(instant -> {
+
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
datasetTimeline));
+ });
+
+ // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid
timestamp
+ validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+ Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
Review comment:
just to complete the argument, let me explain a/ an illustration.
dataset timeline:
C1.complete, C2.complete, C3.inflight, C4.complete.
Metadata timeline
C1.complete, C2.complete, C4.complete.
Few cases that could happen.
a. C3 got committed to metadata and failed before committing to actual
dataset.
a.1. rollback finished for C3 in original dataset.
a.2. rollback not yet triggered for C3 in original dataset. So, C3 is still
inflight. Unless new writes kick in, this will not be rolledback.
b. C3 is a partial failed commit. so not yet synced to metadata table.
(b) is straight forward, as we don't sync non-completed commits yet.
a.1. lastInstant will be R5 (if rollback was complete, we would have synced
that to metadata table), and validInstants will be {C1, C2, C3, C4} since we
have a rollback in original dataset for C3. R5 refers to rollback of C3.
a.2. lastInstant will be C4, and validInstants will be {C1, C2, C4}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Metadata Table Synchronous Design
> ---------------------------------
>
> Key: HUDI-2285
> URL: https://issues.apache.org/jira/browse/HUDI-2285
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: Prashant Wason
> Assignee: Prashant Wason
> Priority: Major
> Labels: pull-request-available
>
> h2. *Motivation*
> HUDI Metadata Table version 1 (0.7 release) supports file-listing
> optimization. We intend to add support for additional information -
> record-level index(UUID), column indexes (column range index) to the metadata
> table. This requires re-architecting the table design for large scale
> (50billion+ records), synchronous operations and to reduce the reader-side
> overhead.
> # Limit the amount of sync requirement on the reader side
> # Syncing on reader side may negate the benefits of the secondary index
> # Not syncing on the reader-side simplifies design and reduces testing
> # Allow moving to a multi-writer design with operations running in separate
> pipelines
> # E.g. Clustering / Clean / Backfills in separate pipelines
> # Ease of debugging
> # Scale - Should be able to handle very large inserts - millions of keys,
> thousands of datafiles written
>
> h3. *Writer Side*
> The lifecycle of a HUDI operation will be as listed below. The example below
> shows COMMIT operation but the steps apply for all types of operations.
> # SparkHoodieWriteClient.commit(...) is called by ingestion process at time
> T1
> # Create requested instant t1.commit.requested
> # Create inflight instant t1.commit.inflight
> # Perform the write of RDD into the dataset and create the
> HoodieCommitMetadata
> # HoodieMetadataTableWriter.update(CommitMetadata, t1, WriteStatus)
> # This will perform a delta-commit into the HUDI Metadata Table updating the
> file listing, record-level index (future) and column indexes (future)
> together from the data collected in the WriteStatus.
> # This commit will complete before the commit started on the dataset will
> complete.
> # This will create the t1.deltacommit on the Metadata Table.
> # Since Metadata Table has inline clean and inline compaction, those
> additional operations may also take place at this time
> # Complete the commit by creating t1.commit
> Inline-compaction will only compact those log blocks which can be deemed
> readable as per the algorithm described in the reader-side in the next
> section.
> h3. *Reader Side*
> # List the dataset to find all completed instants - e.g. t1.commit,
> t2.commit … t10.commit
> # Since these instants are completed, their related metadata has already
> been written to the metadata table as part of respective deltacommits -
> t1.deltacommit, t2.deltacommit … t10.deltacommit
> # Find the last completed instant on the dataset - t10.commit
> # Open the FileSlice on the metadata partition with the following
> constraints:
> # Any base file with time > t10 cannot be used
> # Any log blocks whose timestamp is not in the list of completed instants
> (#1 above) cannot be used
> # Only in ingestion failure cases the latest base file (created due to
> compaction) or some log blocks may have to be neglected. In success cases,
> this process should not add extra overhead except for listing the dataset.
>
> h3. *Multi Write Support*
> Since each operation on metadata table writes to the same files (file-listing
> partition has a single FileSlice), we can only allow single-writer access to
> the metadata table. For this, the Transaction Manager is used to lock the
> table before any updates.
> In essence, each multi-writer operation will contend for the same lock to
> write updates to the metadata table before the operation completes. This may
> not even be an issue in reality as the operations will complete at different
> times and the metadata table operations should be fast.
>
> *Upgrade/Downgrade*
> The two versions (current and this new one) differ in schema and its
> complicated to check whether the table is in sync. So its simpler to
> re-bootstrap as its only the file listing which needs to be re-bootstrapped.
> h3. *Support for shards in metadata table partitions.*
> 1. There will be fixed number of shards for each Metadata Table partition.
> 2. Shards are implemented using filenames of format fileId00ABCD where ABCD
> is the shard number. This allows easy identification of the files and their
> order while still keeping the names unique.
> 3. Shards are pre-allocation during the time of bootstrap.
> 4. Currently only files partition has 1 shard. But this code is required for
> record-level-index so implemented here.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)