This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3e1687728fece9f93b8c86d75b43a39ca4620e13 Author: Lokesh Jain <[email protected]> AuthorDate: Thu Mar 20 07:50:59 2025 +0530 [HUDI-9013] Add backwards compatible MDT writer support and reader support with tbl v6 (#12948) --- .../apache/hudi/cli/commands/MetadataCommand.java | 10 +- .../metadata/HoodieBackedTableMetadataWriter.java | 67 ++-- ...ieBackedTableMetadataWriterTableVersionSix.java | 337 +++++++++++++++++++++ .../org/apache/hudi/metrics/HoodieMetrics.java | 9 +- .../FlinkHoodieBackedTableMetadataWriter.java | 4 +- .../apache/hudi/client/SparkRDDWriteClient.java | 6 +- ...ieBackedTableMetadataWriterTableVersionSix.java | 164 ++++++++++ .../hudi/metadata/SparkMetadataWriterFactory.java | 55 ++++ .../org/apache/hudi/table/HoodieSparkTable.java | 6 +- .../functional/TestHoodieBackedTableMetadata.java | 83 +++-- .../client/functional/TestHoodieMetadataBase.java | 3 +- .../hudi/metadata/HoodieTableMetadataUtil.java | 16 + .../hudi/common/testutils/HoodieTestUtils.java | 4 + .../procedures/CreateMetadataTableProcedure.scala | 4 +- .../procedures/InitMetadataTableProcedure.scala | 4 +- .../TestHoodieSparkMergeOnReadTableRollback.java | 11 +- .../hudi/functional/ColumnStatIndexTestBase.scala | 37 ++- .../hudi/functional/RecordLevelIndexTestBase.scala | 4 +- .../hudi/functional/TestColumnStatsIndex.scala | 115 ++++--- .../hudi/functional/TestRecordLevelIndex.scala | 17 +- .../TestRecordLevelIndexTableVersionSix.scala | 29 ++ 21 files changed, 856 insertions(+), 129 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index cabd9fdcaf6..5b98700d04f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -37,7 +37,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.SparkMetadataWriterFactory; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -116,7 +116,7 @@ public class MetadataCommand { public String create( @ShellOption(value = "--sparkMaster", defaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master ) throws Exception { - HoodieCLI.getTableMetaClient(); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); StoragePath metadataPath = new StoragePath(getMetadataTableBasePath(HoodieCLI.basePath)); try { List<StoragePathInfo> pathInfoList = HoodieCLI.storage.listDirectEntries(metadataPath); @@ -131,7 +131,7 @@ public class MetadataCommand { HoodieTimer timer = HoodieTimer.start(); HoodieWriteConfig writeConfig = getWriteConfig(); initJavaSparkContext(Option.of(master)); - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) { + try (HoodieTableMetadataWriter writer = SparkMetadataWriterFactory.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc), metaClient.getTableConfig())) { return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0); } } @@ -163,7 +163,7 @@ public class MetadataCommand { public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master, @ShellOption(value = {"--readonly"}, defaultValue = "false", help = "Open in read-only mode") final boolean readOnly) throws Exception { - HoodieCLI.getTableMetaClient(); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); StoragePath metadataPath = new StoragePath(getMetadataTableBasePath(HoodieCLI.basePath)); try { HoodieCLI.storage.listDirectEntries(metadataPath); @@ -176,7 +176,7 @@ public class MetadataCommand { if (!readOnly) { HoodieWriteConfig writeConfig = getWriteConfig(); initJavaSparkContext(Option.of(master)); - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) { + try (HoodieTableMetadataWriter writer = SparkMetadataWriterFactory.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc), metaClient.getTableConfig())) { // Empty } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index b550b166cf1..d7327734dc9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -120,7 +121,6 @@ import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS; import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath; -import static org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions; import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords; import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices; @@ -133,7 +133,7 @@ import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readS */ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableMetadataWriter { - private static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class); + static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class); // Virtual keys support for metadata table. This Field is // from the metadata payload schema. @@ -143,7 +143,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // Record index has a fixed size schema. This has been calculated based on experiments with default settings // for block size (1MB), compression (GZ) and disabling the hudi metadata fields. private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; - private transient BaseHoodieWriteClient<?, I, ?, ?> writeClient; + protected transient BaseHoodieWriteClient<?, I, ?, ?> writeClient; protected HoodieWriteConfig metadataWriteConfig; protected HoodieWriteConfig dataWriteConfig; @@ -155,8 +155,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM protected StorageConfiguration<?> storageConf; protected final transient HoodieEngineContext engineContext; protected final List<MetadataPartitionType> enabledPartitionTypes; + // Is the MDT bootstrapped and ready to be read from - private boolean initialized = false; + boolean initialized = false; private HoodieTableFileSystemView metadataView; /** @@ -193,6 +194,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT Reader should have been opened post initialization"); } + List<MetadataPartitionType> getEnabledPartitions(TypedProperties writeConfigProps, HoodieTableMetaClient metaClient) { + return MetadataPartitionType.getEnabledPartitions(writeConfigProps, metaClient); + } + abstract HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient); private void mayBeReinitMetadataReader() { @@ -276,7 +281,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit // Otherwise, we use the timestamp of the latest completed action. String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP); - initializeFromFilesystem(initializationTime, metadataPartitionsToInit); + if (!initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp)) { + LOG.error("Failed to initialize MDT from filesystem"); + return false; + } metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); return true; } catch (IOException e) { @@ -339,14 +347,22 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM return false; } + boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants, Option<String> inflightInstantTimestamp) { + return true; + } + /** * Initialize the Metadata Table by listing files and partitions from the file system. * * @param initializationTime - Timestamp to use for the commit * @param partitionsToInit - List of MDT partitions to initialize + * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private void initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit) throws IOException { + private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException { Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient); + if (!shouldInitializeFromFilesystem(pendingDataInstants, inflightInstantTimestamp)) { + return false; + } // FILES partition is always required and is initialized first boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES); @@ -496,6 +512,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM long totalInitTime = partitionInitTimer.endTimer(); LOG.info("Initializing {} index in metadata table took {} in ms", partitionTypeName, totalInitTime); } + return true; } /** @@ -513,7 +530,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM * @param initializationTime Timestamp from dataset to use for initialization * @return a unique timestamp for MDT */ - private String generateUniqueInstantTime(String initializationTime) { + String generateUniqueInstantTime(String initializationTime) { // If it's initialized via Async indexer, we don't need to alter the init time. // otherwise yields the timestamp on the fly. // This function would be called multiple times in a single application if multiple indexes are being @@ -799,13 +816,17 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM .collect(Collectors.toSet()); } + String getTimelineHistoryPath() { + return TIMELINE_HISTORY_PATH.defaultValue(); + } + private HoodieTableMetaClient initializeMetaClient() throws IOException { HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.MERGE_ON_READ) .setTableName(dataWriteConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) // MT version should match DT, such that same readers can read both. .setTableVersion(dataWriteConfig.getWriteVersion()) - .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue()) + .setArchiveLogFolder(getTimelineHistoryPath()) .setPayloadClassName(HoodieMetadataPayload.class.getName()) .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) .setRecordKeyFields(RECORD_KEY_FIELD_NAME) @@ -1005,7 +1026,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM * Interface to assist in converting commit metadata to List of HoodieRecords to be written to metadata table. * Updates of different commit metadata uses the same method to convert to HoodieRecords and hence. */ - private interface ConvertMetadataFunction { + interface ConvertMetadataFunction { Map<String, HoodieData<HoodieRecord>> convertMetadata(); } @@ -1015,7 +1036,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { + void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate(); if (initialized && metadata != null) { // convert metadata and filter only the entries whose partition path are in partitionsToUpdate @@ -1067,7 +1088,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, partitionPaths); // initialize partitions - initializeFromFilesystem(instantTime, partitionTypes); + initializeFromFilesystem(instantTime, partitionTypes, Option.empty()); } /** @@ -1248,7 +1269,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // We cannot create a deltaCommit at instantTime now because a future (rollback) block has already been written to the logFiles. // We need to choose a timestamp which would be a validInstantTime for MDT. This is either a commit timestamp completed on the dataset // or a new timestamp which we use for MDT clean, compaction etc. - String syncCommitTime = writeClient.createNewInstantTime(false); + String syncCommitTime = createRestoreInstantTime(); processAndCommit(syncCommitTime, () -> HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext, partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, syncCommitTime)); closeInternal(); @@ -1257,6 +1278,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM } } + String createRestoreInstantTime() { + return writeClient.createNewInstantTime(false); + } + /** * Update from {@code HoodieRollbackMetadata}. * @@ -1493,12 +1518,12 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM return; } // Check and run clean operations. - cleanIfNecessary(writeClient); + cleanIfNecessary(writeClient, lastInstant.get().requestedTime()); // Do timeline validation before scheduling compaction/logCompaction operations. - if (validateCompactionScheduling()) { + if (validateCompactionScheduling(inFlightInstantTimestamp, lastInstant.get().requestedTime())) { String latestDeltacommitTime = lastInstant.get().requestedTime(); LOG.info("Latest deltacommit time found is {}, running compaction operations.", latestDeltacommitTime); - compactIfNecessary(writeClient); + compactIfNecessary(writeClient, Option.of(latestDeltacommitTime)); } writeClient.archive(); LOG.info("All the table services operations on MDT completed successfully"); @@ -1552,7 +1577,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a * deltacommit. */ - protected void compactIfNecessary(BaseHoodieWriteClient writeClient) { + void compactIfNecessary(BaseHoodieWriteClient<?,I,?,?> writeClient, Option<String> latestDeltaCommitTimeOpt) { // IMPORTANT: Trigger compaction with max instant time that is smaller than(or equals) the earliest pending instant from DT. // The compaction planner will manage to filter out the log files that finished with greater completion time. // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more details. @@ -1586,7 +1611,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM } } - protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) { + protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) { Option<HoodieInstant> lastCompletedCompactionInstant = metadataMetaClient.getActiveTimeline() .getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (lastCompletedCompactionInstant.isPresent() @@ -1603,14 +1628,18 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. - writeClient.clean(metadataMetaClient.createNewInstantTime(false)); + writeClient.clean(createCleanInstantTime(instantTime)); writeClient.lazyRollbackFailedIndexing(); } + String createCleanInstantTime(String instantTime) { + return metadataMetaClient.createNewInstantTime(false); + } + /** * Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled. */ - protected boolean validateCompactionScheduling() { + boolean validateCompactionScheduling(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) { // Under the log compaction scope, the sequence of the log-compaction and compaction needs to be ensured because metadata items such as RLI // only has proc-time ordering semantics. For "ensured", it means the completion sequence of the log-compaction/compaction is the same as the start sequence. if (metadataWriteConfig.isLogCompactionEnabled()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java new file mode 100644 index 00000000000..33e5070b8fb --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.storage.StorageConfiguration; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; + +/** + * HoodieBackedTableMetadataWriter for tables with version 6. The class derives most of the functionality from HoodieBackedTableMetadataWriter + * and overrides some behaviour to make it compatible for version 6. + */ +public abstract class HoodieBackedTableMetadataWriterTableVersionSix<I> extends HoodieBackedTableMetadataWriter<I> { + + private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; + + /** + * Hudi backed table metadata writer. + * + * @param storageConf Storage configuration to use for the metadata writer + * @param writeConfig Writer config + * @param failedWritesCleaningPolicy Cleaning policy on failed writes + * @param engineContext Engine context + * @param inflightInstantTimestamp Timestamp of any instant in progress + */ + protected HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?> storageConf, + HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext engineContext, + Option<String> inflightInstantTimestamp) { + super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp); + } + + @Override + List<MetadataPartitionType> getEnabledPartitions(TypedProperties writeConfigProps, HoodieTableMetaClient metaClient) { + return MetadataPartitionType.getEnabledPartitions(writeConfigProps, metaClient).stream() + .filter(partition -> !partition.equals(MetadataPartitionType.SECONDARY_INDEX)) + .filter(partition -> !partition.equals(MetadataPartitionType.EXPRESSION_INDEX)) + .filter(partition -> !partition.equals(MetadataPartitionType.PARTITION_STATS)) + .collect(Collectors.toList()); + } + + @Override + boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants, Option<String> inflightInstantTimestamp) { + if (pendingDataInstants.stream() + .anyMatch(i -> !inflightInstantTimestamp.isPresent() || !i.equals(inflightInstantTimestamp.get()))) { + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); + LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: {}", + Arrays.toString(pendingDataInstants.toArray())); + return false; + } else { + return true; + } + } + + @Override + String generateUniqueInstantTime(String initializationTime) { + // if its initialized via Async indexer, we don't need to alter the init time + HoodieTimeline dataIndexTimeline = dataMetaClient.getActiveTimeline().filter(instant -> instant.getAction().equals(HoodieTimeline.INDEXING_ACTION)); + if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline, initializationTime)) { + return initializationTime; + } + // Add suffix to initializationTime to find an unused instant time for the next index initialization. + // This function would be called multiple times in a single application if multiple indexes are being + // initialized one after the other. + for (int offset = 0; ; ++offset) { + final String commitInstantTime = createIndexInitTimestamp(initializationTime, offset); + if (!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) { + return commitInstantTime; + } + } + } + + /** + * Create the timestamp for an index initialization operation on the metadata table. + * <p> + * Since many MDT partitions can be initialized one after other the offset parameter controls generating a + * unique timestamp. + */ + private String createIndexInitTimestamp(String timestamp, int offset) { + return String.format("%s%03d", timestamp, PARTITION_INITIALIZATION_TIME_SUFFIX + offset); + } + + @Override + String getTimelineHistoryPath() { + return ARCHIVELOG_FOLDER.defaultValue(); + } + + /** + * Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled. + */ + @Override + boolean validateCompactionScheduling(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) { + // we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table. + // Whenever you want to change this logic, please ensure all below scenarios are considered. + // a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed + // b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, the latest compaction instant time in MDT represents + // any instants before that is already synced with metadata table. + // c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every + // instant before c4 is synced with metadata table. + List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() + .findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants(); + + if (!pendingInstants.isEmpty()) { + checkNumDeltaCommits(metadataMetaClient, dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending()); + LOG.info(String.format( + "Cannot compact metadata table as there are %d inflight instants in data table before latest deltacommit in metadata table: %s. Inflight instants in data table: %s", + pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, Arrays.toString(pendingInstants.toArray()))); + return false; + } + + // Check if there are any pending compaction or log compaction instants in the timeline. + // If pending compact/logCompaction operations are found abort scheduling new compaction/logCompaction operations. + Option<HoodieInstant> pendingLogCompactionInstant = + metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant(); + Option<HoodieInstant> pendingCompactionInstant = + metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) { + LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present", + pendingCompactionInstant, pendingLogCompactionInstant)); + return false; + } + return true; + } + + private void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int maxNumDeltaCommitsWhenPending) { + final HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline(); + Option<HoodieInstant> lastCompaction = activeTimeline.filterCompletedInstants() + .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant(); + int numDeltaCommits = lastCompaction.isPresent() + ? activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants() + : activeTimeline.getDeltaCommitTimeline().countInstants(); + if (numDeltaCommits > maxNumDeltaCommitsWhenPending) { + throw new HoodieMetadataException(String.format("Metadata table's deltacommits exceeded %d: " + + "this is likely caused by a pending instant in the data table. Resolve the pending instant " + + "or adjust `%s`, then restart the pipeline.", + maxNumDeltaCommitsWhenPending, HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key())); + } + } + + /** + * Update from {@code HoodieRollbackMetadata}. + * + * @param rollbackMetadata {@code HoodieRollbackMetadata} + * @param instantTime Timestamp at which the rollback was performed + */ + @Override + public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { + if (initialized && metadata != null) { + // The commit which is being rolled back on the dataset + final String commitToRollbackInstantTime = rollbackMetadata.getCommitsRollback().get(0); + // Find the deltacommits since the last compaction + Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline()); + + // This could be a compaction or deltacommit instant (See CompactionUtils.getDeltaCommitsSinceLatestCompaction) + HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue(); + HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey(); + + // The deltacommit that will be rolled back + HoodieInstant deltaCommitInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime, + InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + + validateRollbackVersionSix(commitToRollbackInstantTime, compactionInstant, deltacommitsSinceCompaction); + + // lets apply a delta commit with DT's rb instant containing following records: + // a. any log files as part of RB commit metadata that was added + // b. log files added by the commit in DT being rolled back. By rolled back, we mean, a rollback block will be added and does not mean it will be deleted. + // both above list should only be added to FILES partition. + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient, rollbackMetadata, instantTime)); + + String rollbackInstantTime = createRollbackTimestamp(instantTime); + if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) { + LOG.info("Rolling back MDT deltacommit " + commitToRollbackInstantTime); + if (!getWriteClient().rollback(commitToRollbackInstantTime, rollbackInstantTime)) { + throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitToRollbackInstantTime); + } + } else { + LOG.info("Ignoring rollback of instant {} at {}. The commit to rollback is not found in MDT", + commitToRollbackInstantTime, instantTime); + } + closeInternal(); + } + } + + private void validateRollbackVersionSix( + String commitToRollbackInstantTime, + HoodieInstant compactionInstant, + HoodieTimeline deltacommitsSinceCompaction) { + // The commit being rolled back should not be earlier than the latest compaction on the MDT. Compaction on MDT only occurs when all actions + // are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore. + if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { + final String compactionInstantTime = compactionInstant.requestedTime(); + if (compareTimestamps(commitToRollbackInstantTime, LESSER_THAN_OR_EQUALS, compactionInstantTime)) { + throw new HoodieMetadataException(String.format("Commit being rolled back %s is earlier than the latest compaction %s. " + + "There are %d deltacommits after this compaction: %s", commitToRollbackInstantTime, compactionInstantTime, + deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants())); + } + } + } + + /** + * Perform a compaction on the Metadata Table. + * <p> + * Cases to be handled: + * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because + * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx. + * <p> + * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a + * deltacommit. + */ + @Override + void compactIfNecessary(BaseHoodieWriteClient<?,I,?,?> writeClient, Option<String> latestDeltaCommitTimeOpt) { + // Trigger compaction with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + final String compactionInstantTime = createCompactionTimestamp(latestDeltaCommitTimeOpt.get()); + + // we need to avoid checking compaction w/ same instant again. + // let's say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT. + // and again w/ C6, we will re-attempt compaction at which point latest delta commit is C4 in MDT. + // and so we try compaction w/ instant C4001. So, we can avoid compaction if we already have compaction w/ same instant time. + if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) { + LOG.info("Compaction with same {} time is already present in the timeline.", compactionInstantTime); + } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { + LOG.info("Compaction is scheduled for timestamp {}", compactionInstantTime); + writeClient.compact(compactionInstantTime); + } else if (metadataWriteConfig.isLogCompactionEnabled()) { + // Schedule and execute log compaction with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + final String logCompactionInstantTime = createLogCompactionTimestamp(latestDeltaCommitTimeOpt.get()); + if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) { + LOG.info("Log compaction with same {} time is already present in the timeline.", logCompactionInstantTime); + } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) { + LOG.info("Log compaction is scheduled for timestamp {}", logCompactionInstantTime); + writeClient.logCompact(logCompactionInstantTime); + } + } + } + + @Override + String createCleanInstantTime(String instantTime) { + return createCleanTimestamp(instantTime); + } + + @Override + String createRestoreInstantTime() { + return createRestoreTimestamp(writeClient.createNewInstantTime(false)); + } + + /** + * Create the timestamp for a compaction operation on the metadata table. + */ + private String createCompactionTimestamp(String timestamp) { + return timestamp + getCompactionOperationSuffix(); + } + + /** + * Create the timestamp for a compaction operation on the metadata table. + */ + private String createLogCompactionTimestamp(String timestamp) { + return timestamp + getLogCompactionOperationSuffix(); + } + + private String createRollbackTimestamp(String timestamp) { + return timestamp + getRollbackOperationSuffix(); + } + + private String createCleanTimestamp(String timestamp) { + return timestamp + getCleanOperationSuffix(); + } + + private String createRestoreTimestamp(String timestamp) { + return timestamp + getRestoreOperationSuffix(); + } + + private String getCompactionOperationSuffix() { + return "001"; + } + + private String getLogCompactionOperationSuffix() { + return "005"; + } + + private String getRollbackOperationSuffix() { + return "006"; + } + + private String getCleanOperationSuffix() { + return "002"; + } + + private String getRestoreOperationSuffix() { + return "003"; + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 025a0edf9fa..0146a2baf1b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory; import java.util.Set; +import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH; + /** * Wrapper for metrics-related operations. */ @@ -454,7 +456,12 @@ public class HoodieMetrics { HoodieTimeline filteredInstants = activeTimeline.filterCompletedInstants().filter(instant -> validActions.contains(instant.getAction())); Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant(); if (hoodieInstantOption.isPresent()) { - updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime())); + String requestedTime = hoodieInstantOption.get().requestedTime(); + if (hoodieInstantOption.get().requestedTime().length() > MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) { + // If requested instant is in MDT with table version six, it can contain suffix + requestedTime = requestedTime.substring(0, requestedTime.length() - 3); + } + updateMetric(action, metricName, Long.parseLong(requestedTime)); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index ad7aca9ec6a..a88a8fa1073 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -133,7 +133,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); } - compactIfNecessary(writeClient); + compactIfNecessary(writeClient, Option.empty()); if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time @@ -170,7 +170,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // reload timeline metadataMetaClient.reloadActiveTimeline(); - cleanIfNecessary(writeClient); + cleanIfNecessary(writeClient, ""); writeClient.archive(); // Update total size of the metadata and count of base/log files diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 58b2d98ffd7..2331596b0eb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -42,7 +42,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.SparkMetadataWriterFactory; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.BulkInsertPartitioner; @@ -300,8 +300,8 @@ public class SparkRDDWriteClient<T> extends metrics.emitMetadataEnablementMetrics(true, isMetadataColStatsAvailable, isMetadataBloomFilterAvailable, isMetadataRliAvailable); } - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create( - context.getStorageConf(), config, context, inFlightInstantTimestamp)) { + try (HoodieTableMetadataWriter writer = SparkMetadataWriterFactory.create( + context.getStorageConf(), config, context, inFlightInstantTimestamp, tableConfig)) { if (writer.isInitialized()) { writer.performTableServices(inFlightInstantTimestamp); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java new file mode 100644 index 00000000000..81df778d2dd --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.metrics.Registry; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.index.HoodieSparkIndexClient; +import org.apache.hudi.metrics.DistributedRegistry; +import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; + +public class SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieBackedTableMetadataWriterTableVersionSix<JavaRDD<HoodieRecord>> { + + private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieBackedTableMetadataWriter.class); + + public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, + HoodieWriteConfig writeConfig, + HoodieEngineContext context, + Option<String> inflightInstantTimestamp) { + return new SparkHoodieBackedTableMetadataWriterTableVersionSix( + conf, writeConfig, EAGER, context, inflightInstantTimestamp); + } + + public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, + HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext context, + Option<String> inflightInstantTimestamp) { + return new SparkHoodieBackedTableMetadataWriterTableVersionSix( + conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp); + } + + public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, + HoodieEngineContext context) { + return create(conf, writeConfig, context, Option.empty()); + } + + SparkHoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?> hadoopConf, + HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext engineContext, + Option<String> inflightInstantTimestamp) { + super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp); + } + + @Override + protected void initRegistry() { + if (metadataWriteConfig.isMetricsOn()) { + Registry registry; + if (metadataWriteConfig.isExecutorMetricsEnabled() && metadataWriteConfig.getMetricsReporterType() != MetricsReporterType.INMEMORY) { + registry = Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName()); + HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; + ((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext()); + } else { + registry = Registry.getRegistry("HoodieMetadata"); + } + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), dataMetaClient.getStorage())); + } else { + this.metrics = Option.empty(); + } + } + + @Override + protected void commit(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) { + commitInternal(instantTime, partitionRecordsMap, false, Option.empty()); + } + + @Override + protected JavaRDD<HoodieRecord> convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) { + return HoodieJavaRDD.getJavaRDD(records); + } + + @Override + protected void bulkCommit( + String instantTime, String partitionName, HoodieData<HoodieRecord> records, + int fileGroupCount) { + SparkHoodieMetadataBulkInsertPartitioner partitioner = new SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount); + commitInternal(instantTime, Collections.singletonMap(partitionName, records), true, Option.of(partitioner)); + } + + @Override + public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) { + List<String> partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); + LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop); + + SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient(); + String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ); + writeClient.startCommitWithTime(instantTime, actionType); + writeClient.deletePartitions(partitionsToDrop, instantTime); + } + + @Override + protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) { + return HoodieSparkTable.create(writeConfig, engineContext, metaClient); + } + + @Override + public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> initializeWriteClient() { + return new SparkRDDWriteClient(engineContext, metadataWriteConfig, Option.empty()); + } + + @Override + protected EngineType getEngineType() { + return EngineType.SPARK; + } + + @Override + protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) { + new HoodieSparkIndexClient(dataWriteConfig, engineContext).createOrUpdateColumnStatsIndexDefinition(dataMetaClient, columnsToIndex); + } + + @Override + protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf, String instantTime) { + throw new HoodieNotSupportedException("Expression index not supported for Java metadata table writer yet."); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java new file mode 100644 index 00000000000..3c6c9c4c1c3 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.StorageConfiguration; + +/** + * Factory class for generating SparkHoodieBackedTableMetadataWriter based on the table version. + */ +public class SparkMetadataWriterFactory { + + public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, + Option<String> inflightInstantTimestamp, HoodieTableConfig tableConfig) { + if (tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { + return SparkHoodieBackedTableMetadataWriterTableVersionSix.create(conf, writeConfig, context, inflightInstantTimestamp); + } else { + return SparkHoodieBackedTableMetadataWriter.create(conf, writeConfig, context, inflightInstantTimestamp); + } + } + + public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext context, Option<String> inflightInstantTimestamp, HoodieTableConfig tableConfig) { + if (tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { + return new SparkHoodieBackedTableMetadataWriterTableVersionSix(conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp); + } else { + return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp); + } + } + + public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieTableConfig tableConfig) { + return create(conf, writeConfig, context, Option.empty(), tableConfig); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 0ff6d8ab551..ac6428350dc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -36,7 +36,7 @@ import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; -import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.SparkMetadataWriterFactory; import org.apache.hudi.table.action.commit.HoodieMergeHelper; import org.apache.hadoop.conf.Configuration; @@ -105,9 +105,9 @@ public abstract class HoodieSparkTable<T> // Create the metadata table writer. First time after the upgrade this creation might trigger // metadata table bootstrapping. Bootstrapping process could fail and checking the table // existence after the creation is needed. - HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( + HoodieTableMetadataWriter metadataWriter = SparkMetadataWriterFactory.create( getContext().getStorageConf(), config, failedWritesCleaningPolicy, getContext(), - Option.of(triggeringInstantTimestamp)); + Option.of(triggeringInstantTimestamp), metaClient.getTableConfig()); try { if (isMetadataTableExists || metaClient.getStorage().exists( HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 0fabbe27e15..67d633f8fa8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -27,11 +27,13 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieDataBlock; @@ -59,8 +61,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,15 +102,22 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { private static final Logger LOG = LoggerFactory.getLogger(TestHoodieBackedTableMetadata.class); @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testTableOperations(boolean reuseReaders) throws Exception { + @CsvSource({"true,6", "true,8", "false,6", "false,8"}) + public void testTableOperations(boolean reuseReaders, int tableVersion) throws Exception { HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; - init(tableType); + initPath(); + HoodieWriteConfig config = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false) + .build(); + config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); + init(tableType, config); doWriteInsertAndUpsert(testTable); // trigger an upsert doWriteOperation(testTable, "0000003"); verifyBaseMetadataTable(reuseReaders); + HoodieTableVersion finalTableVersion = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + .getTableConfig().getTableVersion(); + assertEquals(tableVersion, finalTableVersion.versionCode()); } /** @@ -119,11 +127,15 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { * @throws Exception */ @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) throws Exception { + @CsvSource({"true,6", "true,8", "false,6", "false,8"}) + public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse, int tableVersion) throws Exception { final int taskNumber = 3; HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; - init(tableType); + initPath(); + HoodieWriteConfig config = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false) + .build(); + config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); + init(tableType, config); testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1); HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), reuse); assertTrue(tableMetadata.enabled()); @@ -161,6 +173,11 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { executors.awaitTermination(5, TimeUnit.MINUTES); assertFalse(flag.get()); assertEquals(filesNumber.get(), taskNumber); + + // validate table version + HoodieTableVersion finalTableVersion = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + .getTableConfig().getTableVersion(); + assertEquals(tableVersion, finalTableVersion.versionCode()); } private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception { @@ -207,29 +224,47 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { * the right key generator class name. */ @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testMetadataTableKeyGenerator(final HoodieTableType tableType) throws Exception { - init(tableType); + @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8"}) + public void testMetadataTableKeyGenerator(final HoodieTableType tableType, int tableVersion) throws Exception { + initPath(); + HoodieWriteConfig config = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false) + .build(); + config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); + init(tableType, config); HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), false); assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(), tableMetadata.getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName()); + + // validate table version + HoodieTableVersion finalTableVersion = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + .getTableConfig().getTableVersion(); + assertEquals(tableVersion, finalTableVersion.versionCode()); } /** * [HUDI-2852] Table metadata returns empty for non-exist partition. */ @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testNotExistPartition(final HoodieTableType tableType) throws Exception { - init(tableType); + @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8"}) + public void testNotExistPartition(final HoodieTableType tableType, int tableVersion) throws Exception { + initPath(); + HoodieWriteConfig config = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false) + .build(); + config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); + init(tableType, config); HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), false); List<StoragePathInfo> allFilesInPartition = tableMetadata.getAllFilesInPartition( new StoragePath(writeConfig.getBasePath() + "dummy")); assertEquals(allFilesInPartition.size(), 0); + + // validate table version + HoodieTableVersion finalTableVersion = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + .getTableConfig().getTableVersion(); + assertEquals(tableVersion, finalTableVersion.versionCode()); } /** @@ -240,8 +275,8 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { * 3. Verify table services like compaction benefit from record key deduplication feature. */ @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType) throws Exception { + @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8"}) + public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType, int tableVersion) throws Exception { initPath(); writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder() @@ -249,6 +284,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { .withMaxNumDeltaCommitsBeforeCompaction(3) .build()) .build(); + writeConfig.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); init(tableType, writeConfig); // 2nd commit @@ -296,6 +332,11 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { }, "Metadata table should have a valid base file!"); validateMetadata(testTable); + + // validate table version + HoodieTableVersion finalTableVersion = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + .getTableConfig().getTableVersion(); + assertEquals(tableVersion, finalTableVersion.versionCode()); } /** @@ -305,8 +346,8 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { * plan has not been successfully executed before the new one is scheduled. */ @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testRepeatedCleanActionsWithMetadataTableEnabled(final HoodieTableType tableType) throws Exception { + @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8"}) + public void testRepeatedCleanActionsWithMetadataTableEnabled(final HoodieTableType tableType, int tableVersion) throws Exception { initPath(); writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder() @@ -314,6 +355,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { .withMaxNumDeltaCommitsBeforeCompaction(4) .build()) .build(); + writeConfig.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); init(tableType, writeConfig); String partition = "p1"; // Simulate two bulk insert operations adding two data files in partition "p1" @@ -362,6 +404,11 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { assertEquals(1, getNumCompactions(metadataMetaClient)); Set<String> fileSetAfterSecondCleaning = getFilePathsInPartition(partition); validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning, fileSetAfterSecondCleaning); + + // validate table version + HoodieTableVersion finalTableVersion = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + .getTableConfig().getTableVersion(); + assertEquals(tableVersion, finalTableVersion.versionCode()); } private int getNumCompactions(HoodieTableMetaClient metaClient) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 1c7a929ee91..c15950ae22d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.timeline.HoodieTimelineArchiver; import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; @@ -103,7 +104,7 @@ public class TestHoodieMetadataBase extends HoodieSparkClientTestHarness { initHoodieStorage(); storage.createDirectory(new StoragePath(basePath)); initTimelineService(); - initMetaClient(tableType); + initMetaClient(tableType, writeConfig.map(conf -> conf.getProps()).orElse(new TypedProperties())); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); this.writeConfig = writeConfig.isPresent() diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index ead5f77518f..cc8aad17c61 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1118,6 +1118,22 @@ public class HoodieTableMetadataUtil { } } + /** + * Convert rollback action metadata to metadata table records. + * <p> + * We only need to handle FILES partition here as HUDI rollbacks on MOR table may end up adding a new log file. All other partitions + * are handled by actual rollback of the deltacommit which added records to those partitions. + */ + public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords( + HoodieEngineContext engineContext, HoodieTableMetaClient dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String instantTime) { + + List<HoodieRecord> filesPartitionRecords = HoodieTableMetadataUtil.convertMetadataToRollbackRecords(rollbackMetadata, instantTime, dataTableMetaClient); + final HoodieData<HoodieRecord> rollbackRecordsRDD = filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData() + : engineContext.parallelize(filesPartitionRecords, filesPartitionRecords.size()); + + return Collections.singletonMap(MetadataPartitionType.FILES.getPartitionPath(), rollbackRecordsRDD); + } + /** * Convert rollback action metadata to files partition records. * Consider only new log files added. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 580362350d9..e55e8278708 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -233,6 +233,10 @@ public class HoodieTestUtils { builder.setKeyGeneratorType(properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key())); } + if (properties.containsKey("hoodie.write.table.version")) { + builder.setTableVersion(Integer.parseInt(properties.getProperty("hoodie.write.table.version"))); + } + String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class"); if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") && !properties.containsKey("hoodie.datasource.write.partitionpath.field")) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala index 4b81abe0d70..9f3b421d68e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.util.HoodieTimer -import org.apache.hudi.metadata.{HoodieTableMetadata, SparkHoodieBackedTableMetadataWriter} +import org.apache.hudi.metadata.{HoodieTableMetadata, SparkMetadataWriterFactory} import org.apache.hudi.storage.StoragePath import org.apache.spark.sql.Row @@ -63,7 +63,7 @@ class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder w } val timer = HoodieTimer.start val writeConfig = getWriteConfig(basePath) - SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf, writeConfig, new HoodieSparkEngineContext(jsc)) + SparkMetadataWriterFactory.create(metaClient.getStorageConf, writeConfig, new HoodieSparkEngineContext(jsc), metaClient.getTableConfig) Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)")) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala index 4864a70a9ad..2687f50471d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.util.HoodieTimer -import org.apache.hudi.metadata.{HoodieTableMetadata, SparkHoodieBackedTableMetadataWriter} +import org.apache.hudi.metadata.{HoodieTableMetadata, SparkMetadataWriterFactory} import org.apache.hudi.storage.StoragePath import org.apache.spark.internal.Logging @@ -64,7 +64,7 @@ class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder wit val timer = HoodieTimer.start if (!readOnly) { val writeConfig = getWriteConfig(basePath) - SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf, writeConfig, new HoodieSparkEngineContext(jsc)) + SparkMetadataWriterFactory.create(metaClient.getStorageConf, writeConfig, new HoodieSparkEngineContext(jsc), metaClient.getTableConfig) } val action = if (readOnly) "Opened" else "Initialized" diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 44d08f6fb4d..191ad1abf91 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -94,12 +94,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunctionalTestHarness { @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exception { + @CsvSource({"true,6", "true,8", "false,6", "false,8"}) + void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers, int tableVersion) throws Exception { // Set TableType to COW - HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); + Properties properties = new Properties(); + properties.put(HoodieTableConfig.VERSION.key(), String.valueOf(tableVersion)); + properties.put(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), String.valueOf(tableVersion)); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE, properties); HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers); + cfg.setValue(HoodieTableConfig.VERSION, String.valueOf(tableVersion)); + cfg.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION, String.valueOf(tableVersion)); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala index e1c4775f35a..17449e8d098 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala @@ -125,7 +125,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { .save(basePath) dfList = dfList :+ inputDF - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() if (params.shouldValidateColStats) { // Currently, routine manually validating the column stats (by actually reading every column of every file) @@ -292,7 +292,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { val metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(toProperties(metadataOpts)) .build() - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val schemaUtil = new TableSchemaResolver(metaClient) val tableSchema = schemaUtil.getTableAvroSchema(false) val localSourceTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) @@ -442,27 +442,32 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { object ColumnStatIndexTestBase { - case class ColumnStatsTestCase(tableType: HoodieTableType, shouldReadInMemory: Boolean) + case class ColumnStatsTestCase(tableType: HoodieTableType, shouldReadInMemory: Boolean, tableVersion: Int) def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType => - Seq(Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true)), - Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = false)) + Seq(Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion = 6)), + Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = false, tableVersion = 6)), + Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion = 8)), + Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = false, tableVersion = 8)) ) ): _*) } def testMetadataColumnStatsIndexParamsInMemory: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType => - Seq(Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true)) + Seq(Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion = 6)), + Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion = 8)) ) ): _*) } def testMetadataColumnStatsIndexParamsForMOR: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( - Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = true)), - Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = false)) + Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = true, tableVersion = 6)), + Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = false, tableVersion = 6)), + Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = true, tableVersion = 8)), + Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = false, tableVersion = 8)) ) : _*) } @@ -470,11 +475,19 @@ object ColumnStatIndexTestBase { def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( Seq( - Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"), + // Table version 6 + Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8", "6"), // empty partition col represents non-partitioned table. - Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""), - Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"), - Arguments.arguments(HoodieTableType.MERGE_ON_READ, "") + Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "", "6"), + Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8", "6"), + Arguments.arguments(HoodieTableType.MERGE_ON_READ, "", "6"), + + // Table version 8 + Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8", "8"), + // empty partition col represents non-partitioned table. + Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "", "8"), + Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8", "8"), + Arguments.arguments(HoodieTableType.MERGE_ON_READ, "", "8") ) : _*) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 35fdc8b041e..872bca957a7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -37,7 +37,7 @@ class RecordLevelIndexTestBase extends HoodieStatsIndexTestBase { HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true" ) - val commonOpts: Map[String, String] = Map( + def commonOpts: Map[String, String] = Map( PARTITIONPATH_FIELD.key -> "partition", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15" @@ -94,7 +94,7 @@ class RecordLevelIndexTestBase extends HoodieStatsIndexTestBase { if (!metaClientReloaded) { // initialization of meta client is required again after writing data so that // latest table configs are picked up - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() metaClientReloaded = true } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 086443e0685..44ae5d05447 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -26,7 +26,8 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieStorageConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} +import org.apache.hudi.common.table.timeline.versioning.v1.InstantFileNameGeneratorV1 import org.apache.hudi.common.table.view.FileSystemViewManager import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR @@ -50,7 +51,7 @@ import org.apache.spark.sql.types._ import org.junit.jupiter.api._ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{EnumSource, MethodSource, ValueSource} +import org.junit.jupiter.params.provider.{CsvSource, MethodSource} import java.util.Collections import java.util.stream.Collectors @@ -80,7 +81,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", - "hoodie.compact.inline.max.delta.commits" -> "10" + "hoodie.compact.inline.max.delta.commits" -> "10", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, @@ -171,7 +173,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { shouldValidateColStats = false, shouldValidateManually = false)) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() validateNonExistantColumnsToIndexDefn(metaClient) } @@ -194,7 +196,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", - HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts var expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { @@ -239,8 +242,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { @ParameterizedTest @MethodSource(Array("testTableTypePartitionTypeParams")) - def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: HoodieTableType, partitionCol : String): Unit = { - val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: HoodieTableType, partitionCol : String, tableVersion: Int): Unit = { + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion) val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false" @@ -255,7 +258,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { PRECOMBINE_FIELD.key -> "c1", PARTITIONPATH_FIELD.key -> partitionCol, HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", - HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "5" + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "5", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts // inserts @@ -305,7 +309,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { "index/colstats/mor-bootstrap1-column-stats-index-table.json" } - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val latestCompletedCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().requestedTime // lets validate that we have log files generated in case of MOR table @@ -355,8 +359,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { @ParameterizedTest @MethodSource(Array("testTableTypePartitionTypeParams")) - def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: HoodieTableType, partitionCol : String): Unit = { - val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType: HoodieTableType, partitionCol : String, tableVersion: Int): Unit = { + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion) val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false" @@ -371,7 +375,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { PRECOMBINE_FIELD.key -> "c1", PARTITIONPATH_FIELD.key() -> partitionCol, "hoodie.write.markers.type" -> "DIRECT", - HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts // inserts @@ -412,7 +417,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { "index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json" } - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val latestCompletedCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().requestedTime // updates a subset which are not deleted and enable col stats and validate bootstrap @@ -426,7 +431,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0) validateColumnsToIndex(metaClient, DEFAULT_COLUMNS_TO_INDEX) @@ -434,7 +439,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol: String) : Unit = { // simulate failure for latest commit. - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() var baseFileName : String = null var logFileName : String = null val lastCompletedCommit = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() @@ -452,12 +457,18 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } else { metaClient.getStorage.listFiles(new StoragePath(metaClient.getBasePath, "9")) } - val baseFileFileStatus = dataFiles.stream().filter(fileStatus => fileStatus.getPath.getName.contains(lastCompletedCommit.requestedTime)).findFirst().get() + val baseFileFileStatus = dataFiles.stream().filter(fileStatus => fileStatus.getPath.getName.contains(lastCompletedCommit.requestedTime) + && fileStatus.getPath.getName.contains("parquet")).findFirst().get() baseFileName = baseFileFileStatus.getPath.getName } - val latestCompletedFileName = INSTANT_FILE_NAME_GENERATOR.getFileName(lastCompletedCommit) - metaClient.getStorage.deleteFile(new StoragePath(metaClient.getBasePath.toString + "/.hoodie/timeline/" + latestCompletedFileName)) + if (metaClient.getTableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT)) { + val latestCompletedFileName = new InstantFileNameGeneratorV1().getFileName(lastCompletedCommit) + metaClient.getStorage.deleteFile(new StoragePath(metaClient.getBasePath.toString + "/.hoodie/" + latestCompletedFileName)) + } else { + val latestCompletedFileName = INSTANT_FILE_NAME_GENERATOR.getFileName(lastCompletedCommit) + metaClient.getStorage.deleteFile(new StoragePath(metaClient.getBasePath.toString + "/.hoodie/timeline/" + latestCompletedFileName)) + } // re-create marker for the deleted file. if (tableType == HoodieTableType.MERGE_ON_READ) { @@ -475,11 +486,12 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } } - @Test - def testMORDeleteBlocks(): Unit = { + @ParameterizedTest + @CsvSource(value = Array("6", "8")) + def testMORDeleteBlocks(tableVersion: Int): Unit = { val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ val partitionCol = "c8" - val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion) val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" @@ -494,7 +506,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { PRECOMBINE_FIELD.key -> "c1", PARTITIONPATH_FIELD.key() -> partitionCol, HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", - HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5" + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts // inserts @@ -533,10 +546,10 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } @ParameterizedTest - @ValueSource(strings = Array("", "c8")) - def testColStatsWithCleanCOW(partitionCol: String): Unit = { + @CsvSource(value = Array("'',6", "'',8", "c8,6", "c8,8")) + def testColStatsWithCleanCOW(partitionCol: String, tableVersion: Int): Unit = { val tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE - val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion) val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true" ) @@ -550,7 +563,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { PRECOMBINE_FIELD.key -> "c1", PARTITIONPATH_FIELD.key() -> partitionCol, HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", - HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1" + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts // inserts @@ -598,10 +612,10 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } @ParameterizedTest - @ValueSource(strings = Array("", "c8")) - def testColStatsWithCleanMOR(partitionCol: String): Unit = { + @CsvSource(value = Array("'',6", "'',8", "c8,6", "c8,8")) + def testColStatsWithCleanMOR(partitionCol: String, tableVersion: Int): Unit = { val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ - val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true) + val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true, tableVersion) val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true" ) @@ -616,7 +630,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { PARTITIONPATH_FIELD.key() -> partitionCol, HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1", - HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2" + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> testCase.tableVersion.toString ) ++ metadataOpts // inserts @@ -662,13 +677,13 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0) } @ParameterizedTest - @EnumSource(classOf[HoodieTableType]) - def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit = { + @CsvSource(value = Array("COPY_ON_WRITE,6", "MERGE_ON_READ,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,8")) + def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType, tableVersion: Int): Unit = { val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" @@ -681,7 +696,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", - HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString ) ++ metadataOpts val schema = StructType(StructField("c1", IntegerType, false) :: StructField("c2", StringType, true) :: Nil) @@ -698,7 +714,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { .mode(SaveMode.Overwrite) .save(basePath) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(toProperties(metadataOpts)) @@ -715,8 +731,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMetadataColumnStatsWithFilesFilter(shouldReadInMemory: Boolean): Unit = { + @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8")) + def testMetadataColumnStatsWithFilesFilter(shouldReadInMemory: Boolean, tableVersion: Int): Unit = { val targetColumnsToIndex = Seq("c1", "c2", "c3") val metadataOpts = Map( @@ -733,7 +749,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { PRECOMBINE_FIELD.key -> "c1", PARTITIONPATH_FIELD.key() -> "c8", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", - HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true" + HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString ) ++ metadataOpts val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString @@ -752,7 +769,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { .mode(SaveMode.Overwrite) .save(basePath) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(toProperties(metadataOpts)) @@ -811,8 +828,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = { + @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8")) + def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean, tableVersion: Int): Unit = { var targetColumnsToIndex = Seq("c1", "c2", "c3") val metadataOpts = Map( @@ -828,7 +845,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", - HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true" + HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString ) ++ metadataOpts val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString @@ -847,7 +865,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { .mode(SaveMode.Overwrite) .save(basePath) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(toProperties(metadataOpts)) @@ -896,7 +914,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { .mode(SaveMode.Append) .save(basePath) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val requestedColumns = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS) .getSourceFields.toSeq.filterNot(colName => colName.startsWith("_hoodie")).sorted.toSeq @@ -925,8 +943,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory: Boolean): Unit = { + @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8")) + def testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory: Boolean, tableVersion: Int): Unit = { val targetColumnsToIndex = Seq("c1", "c2", "c3") val metadataOpts = Map( @@ -941,7 +959,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", - HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString ) ++ metadataOpts val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString @@ -960,7 +979,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { .mode(SaveMode.Overwrite) .save(basePath) - metaClient = HoodieTableMetaClient.reload(metaClient) + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() val metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(toProperties(metadataOpts)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index e91c75f6e32..5411dc8c6c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -123,6 +123,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { .save(basePath) val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true) deletedDf3.cache() + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() validateDataAndRecordIndices(hudiOpts, deletedDf3) } @@ -419,14 +420,10 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { } @ParameterizedTest - @CsvSource(value = Array( - "COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8" - )) - def testRLIWithDTClustering(tableType: String, tableVersion: Int): Unit = { + @EnumSource(classOf[HoodieTableType]) + def testRLIWithDTClustering(tableType: HoodieTableType): Unit = { val hudiOpts = commonOpts ++ Map( - DataSourceWriteOptions.TABLE_TYPE.key -> tableType, - HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString, - HoodieTableConfig.VERSION.key() -> tableVersion.toString, + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2" ) @@ -434,7 +431,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { for ((k, v) <- hudiOpts) { props.put(k, v) } - initMetaClient(HoodieTableType.valueOf(tableType), props) + initMetaClient(tableType, props) doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, @@ -455,6 +452,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { assertTrue(getLatestClusteringInstant().get().requestedTime.compareTo(lastClusteringInstant.get().requestedTime) > 0) assertEquals(getLatestClusteringInstant(), metaClient.getActiveTimeline.lastInstant()) + validateDataAndRecordIndices(hudiOpts) // We are validating rollback of a DT clustering instant here rollbackLastInstant(hudiOpts) validateDataAndRecordIndices(hudiOpts) @@ -545,6 +543,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append) + doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) val metadataTableFSView = getHoodieTable(metaClient, getWriteConfig(hudiOpts)).getMetadataTable .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView val compactionTimeline = metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala new file mode 100644 index 00000000000..71e31057b7c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.config.HoodieWriteConfig + +class TestRecordLevelIndexTableVersionSix extends TestRecordLevelIndex { + override def commonOpts: Map[String, String] = super.commonOpts ++ Map( + HoodieTableConfig.VERSION.key() -> "6", + HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6" + ) +}
