danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1190914223
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -619,41 +680,34 @@ private boolean
anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
return false;
}
- private void
updateInitializedPartitionsInTableConfig(List<MetadataPartitionType>
partitionTypes) {
- Set<String> completedPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
-
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
-
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(),
String.join(",", completedPartitions));
- HoodieTableConfig.update(dataMetaClient.getFs(), new
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
- }
-
- private HoodieTableMetaClient initializeMetaClient(boolean
populateMetaFields) throws IOException {
+ private HoodieTableMetaClient initializeMetaClient() throws IOException {
return HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(tableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.setRecordKeyFields(RECORD_KEY_FIELD_NAME)
- .setPopulateMetaFields(populateMetaFields)
-
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
- .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+ .setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+ .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
Review Comment:
Fix the indentation.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
/**
* Processes commit metadata from data table and commits to metadata table.
*
- * @param instantTime instant time of interest.
+ * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
- * @param <T> type of commit metadata.
- * @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
- private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (!dataWriteConfig.isMetadataTableEnabled()) {
- return;
- }
+ private void processAndCommit(String instantTime, ConvertMetadataFunction
convertMetadataFunction) {
+ ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
Set<String> inflightIndexes =
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
// if indexing is inflight then do not trigger table service
boolean doNotTriggerTableService =
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);
Review Comment:
Do we still need this?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -118,46 +123,32 @@ protected void initRegistry() {
}
@Override
- protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext
engineContext,
- Option<T>
actionMetadata,
- Option<String>
inflightInstantTimestamp) {
- try {
- metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
- if (registry instanceof DistributedRegistry) {
- HoodieSparkEngineContext sparkEngineContext =
(HoodieSparkEngineContext) engineContext;
- ((DistributedRegistry)
registry).register(sparkEngineContext.getJavaSparkContext());
- }
- });
+ protected void commit(String instantTime, Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ commitInternal(instantTime, partitionRecordsMap, Option.empty());
+ }
- if (enabled) {
- initializeIfNeeded(dataMetaClient, actionMetadata,
inflightInstantTimestamp);
- }
- } catch (IOException e) {
- LOG.error("Failed to initialize metadata table. Disabling the writer.",
e);
- enabled = false;
- }
+ protected void bulkCommit(
+ String instantTime, MetadataPartitionType partitionType,
HoodieData<HoodieRecord> records,
+ int fileGroupCount) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap =
new HashMap<>();
+ partitionRecordsMap.put(partitionType, records);
+ SparkHoodieMetadataBulkInsertPartitioner partitioner = new
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
Review Comment:
Can bulk_insert supporting writing HFile directly ? It seems it still write
parquets, then the k-v query performance should not be good ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -373,105 +356,92 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
- /**
- * Initialize the metadata table if it does not exist.
- * <p>
- * If the metadata table does not exist, then file and partition listing is
used to initialize the table.
- *
- * @param engineContext
- * @param actionMetadata Action metadata types extending Avro
generated SpecificRecordBase
- * @param inflightInstantTimestamp Timestamp of an instant in progress on
the dataset. This instant is ignored
- * while deciding to initialize the metadata
table.
- */
- protected abstract <T extends SpecificRecordBase> void
initialize(HoodieEngineContext engineContext,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp);
-
- public void initTableMetadata() {
- try {
- if (this.metadata != null) {
- this.metadata.close();
- }
- this.metadata = new HoodieBackedTableMetadata(engineContext,
dataWriteConfig.getMetadataConfig(),
- dataWriteConfig.getBasePath(),
dataWriteConfig.getSpillableMapBasePath());
- this.metadataMetaClient = metadata.getMetadataMetaClient();
- } catch (Exception e) {
- throw new HoodieException("Error initializing metadata table for reads",
e);
- }
- }
-
/**
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param actionMetadata - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on
the dataset
* @param <T> - action metadata types extending Avro
generated SpecificRecordBase
- * @throws IOException
+ * @throws IOException on errors
*/
- protected <T extends SpecificRecordBase> void
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
- Option<T>
actionMetadata,
-
Option<String> inflightInstantTimestamp) throws IOException {
+ protected <T extends SpecificRecordBase> boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+
Option<T> actionMetadata,
+
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
- boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ try {
+ boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+ if (!exists) {
+ // FILES partition is always required
+ partitionsToInit.add(MetadataPartitionType.FILES);
+ }
- if (!exists) {
- // Initialize for the first time by listing partitions and files
directly from the file system
- if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+ // check if any of the enabled partition types needs to be initialized
+ // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
+ if (!dataWriteConfig.isMetadataAsyncIndex()) {
+ Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+ LOG.info("Async metadata indexing disabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
+ this.enabledPartitionTypes.stream()
+ .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
+ .forEach(partitionsToInit::add);
}
- return;
- }
- // if metadata table exists, then check if any of the enabled partition
types needs to be initialized
- // NOTE: It needs to be guarded by async index config because if that is
enabled then initialization happens through the index scheduler.
- if (!dataWriteConfig.isMetadataAsyncIndex()) {
- Set<String> inflightAndCompletedPartitions =
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
- LOG.info("Async metadata indexing enabled and following partitions
already initialized: " + inflightAndCompletedPartitions);
- List<MetadataPartitionType> partitionsToInit =
this.enabledPartitionTypes.stream()
- .filter(p ->
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
!MetadataPartitionType.FILES.equals(p))
- .collect(Collectors.toList());
- // if there are no partitions to initialize or there is a pending
operation, then don't initialize in this round
- if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient,
inflightInstantTimestamp)) {
- return;
+ if (partitionsToInit.isEmpty()) {
+ // No partitions to initialize
+ return true;
}
- String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
- initTableMetadata(); // re-init certain flags in BaseTableMetadata
- initializeEnabledFileGroups(dataMetaClient, createInstantTime,
partitionsToInit);
- initialCommit(createInstantTime, partitionsToInit);
- updateInitializedPartitionsInTableConfig(partitionsToInit);
+ // If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+ // Otherwise, we use the timestamp of the latest completed action.
+ String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants()
+
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
Review Comment:
`.getReverseOrderedInstants().findFirst()` -> `lastInstant` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]