nsivabalan commented on a change in pull request #3426:
URL: https://github.com/apache/hudi/pull/3426#discussion_r712119491
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -331,4 +301,28 @@ public HoodieTableMetaClient getMetaClient() {
public Map<String, String> stats() {
return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new
HashMap<>());
}
+
+ @Override
+ public Option<String> getSyncedInstantTime() {
+ if (metaClient != null) {
+ Option<HoodieInstant> latestInstant =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+ if (latestInstant.isPresent()) {
+ return Option.of(latestInstant.get().getTimestamp());
+ }
+ }
+
+ return Option.empty();
+ }
+
+ @Override
+ public Option<String> getLatestCompactionTime() {
+ if (metaClient != null) {
+ Option<HoodieInstant> latestCompaction =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
Review comment:
nts: we filter for commitTimeline bcoz, clean, clustering, delta
commits, etc may not result in a "commit", but respective ones (like clean,
replace commit, delta commit). and so only compaction will result in a "commit
instant".
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -197,6 +197,11 @@ private void rollBackIndex() {
protected void finishRollback(HoodieRollbackMetadata rollbackMetadata)
throws HoodieIOException {
try {
+ // TODO: Potential error here - rollbacks have already completed here so
if the syncTableMetadata fails,
Review comment:
sorry, @leesf can you please clarify your question.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -381,6 +387,57 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
return partitionToFileStatus;
}
+ /**
+ * Initialize shards for a partition.
+ *
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
Review comment:
@nbalajee : these shards/buckets are instantiated by driver based on
configured partitions and bucket counts per partition. Not sure how executor is
involved here. can you help me understand.
@vinothchandar : sorry, I don't understand your point on hash partition. can
you help me understand please.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -157,6 +147,37 @@ protected void commit(List<HoodieRecord> records, String
partitionName, String i
.lastInstant().map(HoodieInstant::getTimestamp);
}
+ /**
+ * Perform a compaction on the Metadata Table.
+ *
+ * We cannot perform compaction if there are previous inflight operations on
the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
+ */
+ private void compactIfNecessary(SparkRDDWriteClient writeClient, String
instantTime) {
+ List<HoodieInstant> pendingInstants =
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested().findInstantsBefore(instantTime)
+ .getInstants().collect(Collectors.toList());
+ if (!pendingInstants.isEmpty()) {
Review comment:
@nbalajee : are you talking about compaction of data table or metadata
table. We expect that if compaction of data table fails, there will be
continuous retries. If not, liveness will not be guaranteed. But in general, we
need to think through this and see if we can relax this constraint. We will
take this up as a follow up.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -381,6 +387,57 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
return partitionToFileStatus;
}
+ /**
+ * Initialize shards for a partition.
+ *
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
+ */
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
+
+ final String newFileId = FSUtils.createNewFileIdPfx();
+ final String newFileIdPrefix = newFileId.substring(0, 32);
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
+
+ LOG.info(String.format("Creating %d shards for partition %s with base
fileId %s at instant time %s",
+ shardCount, partition, newFileId, instantTime));
+ for (int i = 0; i < shardCount; ++i) {
+ // Generate a indexed fileId for each shard and write a log block into
it to create the file.
+ final String shardFileId = String.format("%s%04d", newFileIdPrefix, i +
1);
+ ValidationUtils.checkArgument(newFileId.length() ==
shardFileId.length(), "FileId should be of length " + newFileId.length());
+ try {
+ HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
partition))
+ .withFileId(shardFileId).overBaseCommit(instantTime)
+ .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+ .withFileSize(0L)
+ .withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
+ .withFs(datasetMetaClient.getFs())
+ .withRolloverLogWriteToken(FSUtils.makeWriteToken(0, 0, 0))
+ .withLogWriteToken(FSUtils.makeWriteToken(0, 0, 0))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Review comment:
as you might have known, we fence compaction with any inflght commits in
data table. And so any new writes for a new shard or bucket has to start with
log file and not go into base file. If we don't create a log file here, a new
write to a new bucket might start creating a base file first. And due to the
fact that we fence compaction, we can't create base file here.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -340,4 +345,46 @@ private static void
processRollbackMetadata(HoodieRollbackMetadata rollbackMetad
return records;
}
+
+ /**
+ * Map a key to a shard.
+ *
+ * Note: For hashing, the algorithm is same as String.hashCode() but is
being defined here as hashCode()
+ * implementation is not guaranteed by the JVM to be consistent across JVM
versions and implementations.
+ *
+ * @param str
+ * @return An integer hash of the given string
+ */
+ public static int keyToShard(String str, int numShards) {
+ int h = 0;
Review comment:
but this is what I found in java sdk source code as well for
String.hashcode().
guess it should be fine. thats why we take absolute value below and
calculate the right bucket.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -96,6 +94,11 @@ public SparkRDDWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeC
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig
writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
+ if (config.useFileListingMetadata()) {
Review comment:
as of now, its called as isMetadataTableEnabled.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -396,37 +408,56 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize shards for a partition.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
+
+ final String newFileId = FSUtils.createNewFileIdPfx();
+ final String newFileIdPrefix = newFileId.substring(0, 32);
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
Review comment:
yes. you are right.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -40,17 +40,10 @@
// Enable the internal Metadata Table which saves file listings
public static final ConfigProperty<Boolean> METADATA_ENABLE_PROP =
ConfigProperty
.key(METADATA_PREFIX + ".enable")
- .defaultValue(false)
+ .defaultValue(true)
Review comment:
no
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +70,20 @@ protected HoodieSparkTable(HoodieWriteConfig config,
HoodieEngineContext context
protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return SparkHoodieIndex.createIndex(config);
}
+
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ if (!config.useFileListingMetadata()) {
+ return Option.empty();
+ }
+
+ try {
+ if (!metaClient.getFs().exists(new
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
Review comment:
SparkRDDWriteClient instantiation actually triggers bootstrap for the
first time when metadata is enabled. so, not really sure on what scenario, we
will hit this case where metadata is enabled, but table does not exist.
@prashantwason @nbalajee : do you know the reason why we have this check here.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
+ return shardReaders.computeIfAbsent(partitionName, k -> {
+ try {
+ final long baseFileOpenMs;
+ final long logScannerOpenMs;
+ HoodieFileReader baseFileReader = null;
+ HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+ // Metadata is in sync till the latest completed instant on the dataset
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<FileSlice> shards =
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+ ValidationUtils.checkArgument(shards.size() == 1,
String.format("Invalid number of shards: found=%d, required=%d", shards.size(),
1));
+ final FileSlice slice =
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+ // If the base file is present then create a reader
+ Option<HoodieBaseFile> basefile = slice.getBaseFile();
+ if (basefile.isPresent()) {
Review comment:
yes
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
} finally {
if (!reuse) {
- closeOrThrow();
+ close(partitionName);
}
}
}
- private void openReadersIfNeededOrThrow() {
- try {
- openReadersIfNeeded();
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers to the Metadata
Table: ", e);
- }
- }
-
/**
* Returns a new pair of readers to the base and log files.
*/
- private void openReadersIfNeeded() throws IOException {
- if (reuse && (baseFileReader != null || logRecordScanner != null)) {
- // quickly exit out without synchronizing if reusing and readers are
already open
- return;
- }
-
- // we always force synchronization, if reuse=false, to handle concurrent
close() calls as well.
- synchronized (this) {
- if (baseFileReader != null || logRecordScanner != null) {
- return;
- }
-
- final long baseFileOpenMs;
- final long logScannerOpenMs;
-
- // Metadata is in sync till the latest completed instant on the dataset
- HoodieTimer timer = new HoodieTimer().startTimer();
- String latestInstantTime = getLatestDatasetInstantTime();
- ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() ==
1, "must be at-least one valid metadata file slice");
-
- // If the base file is present then create a reader
- Option<HoodieBaseFile> basefile =
latestFileSystemMetadataSlices.get(0).getBaseFile();
- if (basefile.isPresent()) {
- String basefilePath = basefile.get().getPath();
- baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
- baseFileOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
- basefile.get().getCommitTime(), baseFileOpenMs));
- } else {
- baseFileOpenMs = 0;
- timer.endTimer();
- }
-
- // Open the log record scanner using the log files from the latest file
slice
- timer.startTimer();
- List<String> logFilePaths =
latestFileSystemMetadataSlices.get(0).getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(o -> o.getPath().toString())
- .collect(Collectors.toList());
- Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Load the schema
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
- .withFileSystem(metaClient.getFs())
- .withBasePath(metadataBasePath)
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(schema)
- .withLatestInstantTime(latestMetaInstantTimestamp)
- .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableMapDirectory)
- .withDiskMapType(commonConfig.getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
- .build();
-
- logScannerOpenMs = timer.endTimer();
- LOG.info(String.format("Opened metadata log files from %s at instant
(dataset instant=%s, metadata instant=%s) in %d ms",
- logFilePaths, latestInstantTime, latestMetaInstantTimestamp,
logScannerOpenMs));
-
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
- }
- }
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) throws IOException {
+ return shardReaders.computeIfAbsent(partitionName, k -> {
+ try {
+ final long baseFileOpenMs;
+ final long logScannerOpenMs;
+ HoodieFileReader baseFileReader = null;
+ HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+ // Metadata is in sync till the latest completed instant on the dataset
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<FileSlice> shards =
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+ ValidationUtils.checkArgument(shards.size() == 1,
String.format("Invalid number of shards: found=%d, required=%d", shards.size(),
1));
+ final FileSlice slice =
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+ // If the base file is present then create a reader
+ Option<HoodieBaseFile> basefile = slice.getBaseFile();
+ if (basefile.isPresent()) {
+ String basefilePath = basefile.get().getPath();
+ baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+ baseFileOpenMs = timer.endTimer();
+ LOG.info(String.format("Opened metadata base file from %s at instant
%s in %d ms", basefilePath,
+ basefile.get().getCommitTime(), baseFileOpenMs));
+ } else {
+ baseFileOpenMs = 0;
+ timer.endTimer();
+ }
- private void close(HoodieFileReader localFileReader,
HoodieMetadataMergedLogRecordScanner localLogScanner) {
- try {
- if (localFileReader != null) {
- localFileReader.close();
- }
- if (localLogScanner != null) {
- localLogScanner.close();
+ // Open the log record scanner using the log files from the latest
file slice
+ timer.startTimer();
+ List<String> logFilePaths = slice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(o -> o.getPath().toString())
+ .collect(Collectors.toList());
+
+ // Only those log files which have a corresponding completed instant
on the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ HoodieActiveTimeline datasetTimeline =
datasetMetaClient.getActiveTimeline();
+ Set<String> validInstantTimestamps =
datasetTimeline.filterCompletedInstants().getInstants()
+ .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+ // For any rollbacks and restores, we cannot neglect the instants that
they are rolling back.
+ // The rollback instant should be more recent than the start of the
timeline for it to have rolled back any
+ // instant which we have a log block for.
+ final String minInstantTime = validInstantTimestamps.isEmpty() ?
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN, minInstantTime))
+ .forEach(instant -> {
+
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
datasetTimeline));
+ });
+
+ // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid
timestamp
+ validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+ Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ String latestMetaInstantTimestamp =
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+ // Load the schema
+ Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+ HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
+ logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
+ .withFileSystem(metaClient.getFs())
+ .withBasePath(metadataBasePath)
+ .withLogFilePaths(logFilePaths)
+ .withReaderSchema(schema)
+ .withLatestInstantTime(latestMetaInstantTimestamp)
+ .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
+ .withBufferSize(BUFFER_SIZE)
+ .withSpillableMapBasePath(spillableMapDirectory)
+ .withDiskMapType(commonConfig.getSpillableDiskMapType())
+
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
+ .withLogBlockTimestamps(validInstantTimestamps)
Review comment:
yes.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -110,23 +119,31 @@ protected HoodieBackedTableMetadataWriter(Configuration
hadoopConf, HoodieWriteC
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
initRegistry();
- HoodieTableMetaClient datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
- initialize(engineContext, datasetMetaClient);
- if (enabled) {
- // This is always called even in case the table was created for the
first time. This is because
- // initFromFilesystem() does file listing and hence may take a long
time during which some new updates
- // may have occurred on the table. Hence, calling this always ensures
that the metadata is brought in sync
- // with the active timeline.
- HoodieTimer timer = new HoodieTimer().startTimer();
- syncFromInstants(datasetMetaClient);
- metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR,
timer.endTimer()));
- }
+ this.datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
+ initTransactionManager();
+ initialize(engineContext);
+ initTableMetadata();
} else {
enabled = false;
this.metrics = Option.empty();
}
}
+ /**
+ * Initialize the {@code TransactionManager} to use for metadata table.
+ *
+ * In HUDI multi writer mode, each operation will sync to metadata table
before completion. Metadata table has common
+ * base and log files to update for each operation. So we can only support
serialized operations.
+ */
+ private void initTransactionManager() {
+ // The lock location should be different from the dataset
+ Properties properties = new Properties();
+ properties.putAll(datasetWriteConfig.getProps());
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY,
properties.getProperty(FILESYSTEM_LOCK_PATH_PROP_KEY,
datasetWriteConfig.getBasePath() + "/.hoodie/.locks") + "/metadata");
Review comment:
So, how do you suggest we go about this.
I am thinking if we can do something like this.
Determine the type of lock acquired and automatically derive the properties
accordingly.
for FileSystemBasedLock:
User has to set "hoodie.write.lock.filesystem.path" for data table.
we can add "/metadata" to the value set for this.
for eg: if someone sets data table config to "hudi_path/.locks", we infer
path for metadata as "hudi_path/.locks/metadata".
for metastore based locks:
I could not think of a way to auto derive metadata table configs. bcoz, we
have 3 configs for data table locks. database name, table name and metastore
uris. don't think we can do much from these configs. we might have to add 3 new
props (something like below) and expect users to set them as well.
"hoodie.write.lock.hivemetastore.database" ->
"hoodie.metadata.write.lock.hivemetastore.database"
"hoodie.write.lock.hivemetastore.table" -
"hoodie.metadata.write.lock.hivemetastore.table"
"hoodie.write.lock.hivemetastore.uris" -
"hoodie.metadata.write.lock.hivemetastore.uris"
for zookeeper based locks:
we can infer from "hoodie.write.lock.zookeeper.lock_key". we can suffix
"_metadata" to the value set of this by the user.
I am not addressing this issue right now. once we have some consensus, will
work on the fix.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -492,8 +527,26 @@ public void update(HoodieRestoreMetadata restoreMetadata,
String instantTime) {
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
if (enabled) {
- List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime,
metadata.getSyncedInstantTime());
- commit(records, MetadataPartitionType.FILES.partitionPath(),
instantTime);
+ this.txnManager.beginTransaction(Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)),
+ Option.empty());
+ try {
+ // Is this rollback of an instant that has been synced to the metadata
table?
+ String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
+ boolean wasSynced = metaClient.getActiveTimeline().containsInstant(new
HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
+ if (!wasSynced) {
+ // A compaction may have taken place on metadata table which would
have included this instant being rolled back.
Review comment:
couldn't think of ways to simplify this. lets jam sometime, on how we
can relax the compaction fencing.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -101,7 +94,7 @@ protected void initialize(HoodieEngineContext engineContext,
HoodieTableMetaClie
@Override
protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime) {
Review comment:
yes, this needs some rethinking for record level index. for instance,
will commit metadata alone would suffice to hold all info for record level
index that needs to be ingested to metadata table. Or do we take in writeStatus
or something as well. Will leave this as is for now. Once we have more detailed
understanding of these partitions (range index, record level index, etc), we
can fix this interface.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -299,6 +302,7 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, JavaRDD<WriteSt
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction");
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, writeStats);
+ table.getMetadataWriter().ifPresent(w -> w.update(metadata,
compactionCommitTime));
Review comment:
yes, I have audited once. will double check though.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -304,6 +315,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext
engineContext, Hoodi
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
initTableMetadata();
+ initializeShards(datasetMetaClient,
MetadataPartitionType.FILES.partitionPath(), createInstantTime, 1);
Review comment:
Guess the intention here is for creating buckets for record level index
and any other such partitions we might have. Will change it to buckets.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -396,37 +408,56 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
}
/**
- * Sync the Metadata Table from the instants created on the dataset.
+ * Initialize shards for a partition.
*
- * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+ * Each shard is a single log file with the following format:
+ * <fileIdPrefix>ABCD
+ * where ABCD are digits. This allows up to 9999 shards.
+ *
+ * Example:
+ * fc9f18eb-6049-4f47-bc51-23884bef0001
+ * fc9f18eb-6049-4f47-bc51-23884bef0002
*/
- private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
- ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it
is not enabled");
- // (re) init the metadata for reading.
- initTableMetadata();
- try {
- List<HoodieInstant> instantsToSync =
metadata.findInstantsToSyncForWriter();
- if (instantsToSync.isEmpty()) {
- return;
- }
-
- LOG.info("Syncing " + instantsToSync.size() + " instants to metadata
table: " + instantsToSync);
-
- // Read each instant in order and sync it to metadata table
- for (HoodieInstant instant : instantsToSync) {
- LOG.info("Syncing instant " + instant + " to metadata table");
-
- Option<List<HoodieRecord>> records =
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant,
getLatestSyncedInstantTime());
- if (records.isPresent()) {
- commit(records.get(), MetadataPartitionType.FILES.partitionPath(),
instant.getTimestamp());
- }
+ private void initializeShards(HoodieTableMetaClient datasetMetaClient,
String partition, String instantTime,
+ int shardCount) throws IOException {
+ ValidationUtils.checkArgument(shardCount <= 9999, "Maximum 9999 shards are
supported.");
+
+ final String newFileId = FSUtils.createNewFileIdPfx();
+ final String newFileIdPrefix = newFileId.substring(0, 32);
+ final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+ blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+ final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0],
blockHeader);
+
+ LOG.info(String.format("Creating %d shards for partition %s with base
fileId %s at instant time %s",
+ shardCount, partition, newFileId, instantTime));
+ for (int i = 0; i < shardCount; ++i) {
+ // Generate a indexed fileId for each shard and write a log block into
it to create the file.
+ final String shardFileId = String.format("%s%04d", newFileIdPrefix, i +
1);
+ ValidationUtils.checkArgument(newFileId.length() ==
shardFileId.length(), "FileId should be of length " + newFileId.length());
+ try {
+ HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
partition))
+ .withFileId(shardFileId).overBaseCommit(instantTime)
+ .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+ .withFileSize(0L)
Review comment:
yes, for first log block (even in regular flow), we do the same.
```
return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
....
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
....
```
for when there is no latestLogFile for a given file group, we set the size
to 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]