prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192160408
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -373,105 +356,92 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
- /**
- * Initialize the metadata table if it does not exist.
- * <p>
- * If the metadata table does not exist, then file and partition listing is
used to initialize the table.
- *
- * @param engineContext
- * @param actionMetadata Action metadata types extending Avro
generated SpecificRecordBase
- * @param inflightInstantTimestamp Timestamp of an instant in progress on
the dataset. This instant is ignored
- * while deciding to initialize the metadata
table.
- */
- protected abstract <T extends SpecificRecordBase> void
initialize(HoodieEngineContext engineContext,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp);
-
- public void initTableMetadata() {
- try {
- if (this.metadata != null) {
- this.metadata.close();
- }
- this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(),
- dataWriteConfig.getBasePath(),
dataWriteConfig.getSpillableMapBasePath());
- this.metadataMetaClient = metadata.getMetadataMetaClient();
- } catch (Exception e) {
- throw new HoodieException("Error initializing metadata table for reads",
e);
- }
- }
-
/**
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param actionMetadata - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on
the dataset
* @param <T> - action metadata types extending Avro
generated SpecificRecordBase
- * @throws IOException
+ * @throws IOException on errors
*/
- protected <T extends SpecificRecordBase> void
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp) throws IOException {
+ protected <T extends SpecificRecordBase> boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+
Option<T> actionMetadata,
+
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
- boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ try {
+ boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ if (!exists) {
+ // FILES partition is always required
+ partitionsToInit.add(MetadataPartitionType.FILES);
+ }
- if (!exists) {
- // Initialize for the first time by listing partitions and files
directly from the file system
- if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ // check if any of the enabled partition types needs to be initialized
+ // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
+ if (!dataWriteConfig.isMetadataAsyncIndex()) {
+ Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ this.enabledPartitionTypes.stream()
+ .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .forEach(partitionsToInit::add);
}
- return;
- }
- // if metadata table exists, then check if any of the enabled partition
types needs to be initialized
- // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
- if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing enabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
- List<MetadataPartitionType> partitionsToInit =
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
- .collect(Collectors.toList());
- // if there are no partitions to initialize or there is a pending
operation, then don't initialize in this round
- if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient,
inflightInstantTimestamp)) {
- return;
+ if (partitionsToInit.isEmpty()) {
+ // No partitions to initialize
+ return true;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
- initTableMetadata(); // re-init certain flags in BaseTableMetadata
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
partitionsToInit);
- initialCommit(createInstantTime, partitionsToInit);
- updateInitializedPartitionsInTableConfig(partitionsToInit);
+ // If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+ // Otherwise, we use the timestamp of the latest completed action.
+ String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
+
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]