This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3ddd0222a0b [HUDI-9013] Add backwards compatible MDT writer support
and reader support with tbl v6 (#12948)
3ddd0222a0b is described below
commit 3ddd0222a0b9718af06f68bd98938b363e31726a
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Mar 20 07:50:59 2025 +0530
[HUDI-9013] Add backwards compatible MDT writer support and reader support
with tbl v6 (#12948)
---
.../apache/hudi/cli/commands/MetadataCommand.java | 10 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 67 ++--
...ieBackedTableMetadataWriterTableVersionSix.java | 337 +++++++++++++++++++++
.../org/apache/hudi/metrics/HoodieMetrics.java | 9 +-
.../FlinkHoodieBackedTableMetadataWriter.java | 4 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 6 +-
...ieBackedTableMetadataWriterTableVersionSix.java | 164 ++++++++++
.../hudi/metadata/SparkMetadataWriterFactory.java | 55 ++++
.../org/apache/hudi/table/HoodieSparkTable.java | 6 +-
.../functional/TestHoodieBackedTableMetadata.java | 83 +++--
.../client/functional/TestHoodieMetadataBase.java | 3 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 16 +
.../hudi/common/testutils/HoodieTestUtils.java | 4 +
.../procedures/CreateMetadataTableProcedure.scala | 4 +-
.../procedures/InitMetadataTableProcedure.scala | 4 +-
.../TestHoodieSparkMergeOnReadTableRollback.java | 11 +-
.../hudi/functional/ColumnStatIndexTestBase.scala | 37 ++-
.../hudi/functional/RecordLevelIndexTestBase.scala | 4 +-
.../hudi/functional/TestColumnStatsIndex.scala | 115 ++++---
.../hudi/functional/TestRecordLevelIndex.scala | 17 +-
.../TestRecordLevelIndexTableVersionSix.scala | 29 ++
21 files changed, 856 insertions(+), 129 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index cabd9fdcaf6..5b98700d04f 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -37,7 +37,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -116,7 +116,7 @@ public class MetadataCommand {
public String create(
@ShellOption(value = "--sparkMaster", defaultValue =
SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master
) throws Exception {
- HoodieCLI.getTableMetaClient();
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
StoragePath metadataPath = new
StoragePath(getMetadataTableBasePath(HoodieCLI.basePath));
try {
List<StoragePathInfo> pathInfoList =
HoodieCLI.storage.listDirectEntries(metadataPath);
@@ -131,7 +131,7 @@ public class MetadataCommand {
HoodieTimer timer = HoodieTimer.start();
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new
HoodieSparkEngineContext(jsc))) {
+ try (HoodieTableMetadataWriter writer =
SparkMetadataWriterFactory.create(HoodieCLI.conf, writeConfig, new
HoodieSparkEngineContext(jsc), metaClient.getTableConfig())) {
return String.format("Created Metadata Table in %s (duration=%.2f
secs)", metadataPath, timer.endTimer() / 1000.0);
}
}
@@ -163,7 +163,7 @@ public class MetadataCommand {
public String init(@ShellOption(value = "--sparkMaster", defaultValue =
SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
@ShellOption(value = {"--readonly"}, defaultValue =
"false",
help = "Open in read-only mode") final boolean
readOnly) throws Exception {
- HoodieCLI.getTableMetaClient();
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
StoragePath metadataPath = new
StoragePath(getMetadataTableBasePath(HoodieCLI.basePath));
try {
HoodieCLI.storage.listDirectEntries(metadataPath);
@@ -176,7 +176,7 @@ public class MetadataCommand {
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new
HoodieSparkEngineContext(jsc))) {
+ try (HoodieTableMetadataWriter writer =
SparkMetadataWriterFactory.create(HoodieCLI.conf, writeConfig, new
HoodieSparkEngineContext(jsc), metaClient.getTableConfig())) {
// Empty
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b550b166cf1..d7327734dc9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -120,7 +121,6 @@ import static
org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
-import static
org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
import static
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords;
import static
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
@@ -133,7 +133,7 @@ import static
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readS
*/
public abstract class HoodieBackedTableMetadataWriter<I> implements
HoodieTableMetadataWriter {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
+ static final Logger LOG =
LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);
// Virtual keys support for metadata table. This Field is
// from the metadata payload schema.
@@ -143,7 +143,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Record index has a fixed size schema. This has been calculated based on
experiments with default settings
// for block size (1MB), compression (GZ) and disabling the hudi metadata
fields.
private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
- private transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
+ protected transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
protected HoodieWriteConfig metadataWriteConfig;
protected HoodieWriteConfig dataWriteConfig;
@@ -155,8 +155,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
protected StorageConfiguration<?> storageConf;
protected final transient HoodieEngineContext engineContext;
protected final List<MetadataPartitionType> enabledPartitionTypes;
+
// Is the MDT bootstrapped and ready to be read from
- private boolean initialized = false;
+ boolean initialized = false;
private HoodieTableFileSystemView metadataView;
/**
@@ -193,6 +194,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT
Reader should have been opened post initialization");
}
+ List<MetadataPartitionType> getEnabledPartitions(TypedProperties
writeConfigProps, HoodieTableMetaClient metaClient) {
+ return MetadataPartitionType.getEnabledPartitions(writeConfigProps,
metaClient);
+ }
+
abstract HoodieTable getTable(HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient);
private void mayBeReinitMetadataReader() {
@@ -276,7 +281,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
- initializeFromFilesystem(initializationTime, metadataPartitionsToInit);
+ if (!initializeFromFilesystem(initializationTime,
metadataPartitionsToInit, inflightInstantTimestamp)) {
+ LOG.error("Failed to initialize MDT from filesystem");
+ return false;
+ }
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
return true;
} catch (IOException e) {
@@ -339,14 +347,22 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return false;
}
+ boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants,
Option<String> inflightInstantTimestamp) {
+ return true;
+ }
+
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
* @param initializationTime - Timestamp to use for the commit
* @param partitionsToInit - List of MDT partitions to initialize
+ * @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private void initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit) throws IOException {
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit, Option<String>
inflightInstantTimestamp) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
+ if (!shouldInitializeFromFilesystem(pendingDataInstants,
inflightInstantTimestamp)) {
+ return false;
+ }
// FILES partition is always required and is initialized first
boolean filesPartitionAvailable =
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(FILES);
@@ -496,6 +512,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
long totalInitTime = partitionInitTimer.endTimer();
LOG.info("Initializing {} index in metadata table took {} in ms",
partitionTypeName, totalInitTime);
}
+ return true;
}
/**
@@ -513,7 +530,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @param initializationTime Timestamp from dataset to use for initialization
* @return a unique timestamp for MDT
*/
- private String generateUniqueInstantTime(String initializationTime) {
+ String generateUniqueInstantTime(String initializationTime) {
// If it's initialized via Async indexer, we don't need to alter the init
time.
// otherwise yields the timestamp on the fly.
// This function would be called multiple times in a single application if
multiple indexes are being
@@ -799,13 +816,17 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
.collect(Collectors.toSet());
}
+ String getTimelineHistoryPath() {
+ return TIMELINE_HISTORY_PATH.defaultValue();
+ }
+
private HoodieTableMetaClient initializeMetaClient() throws IOException {
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(dataWriteConfig.getTableName() +
METADATA_TABLE_NAME_SUFFIX)
// MT version should match DT, such that same readers can read both.
.setTableVersion(dataWriteConfig.getWriteVersion())
- .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue())
+ .setArchiveLogFolder(getTimelineHistoryPath())
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.setRecordKeyFields(RECORD_KEY_FIELD_NAME)
@@ -1005,7 +1026,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* Interface to assist in converting commit metadata to List of
HoodieRecords to be written to metadata table.
* Updates of different commit metadata uses the same method to convert to
HoodieRecords and hence.
*/
- private interface ConvertMetadataFunction {
+ interface ConvertMetadataFunction {
Map<String, HoodieData<HoodieRecord>> convertMetadata();
}
@@ -1015,7 +1036,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
*/
- private void processAndCommit(String instantTime, ConvertMetadataFunction
convertMetadataFunction) {
+ void processAndCommit(String instantTime, ConvertMetadataFunction
convertMetadataFunction) {
Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
if (initialized && metadata != null) {
// convert metadata and filter only the entries whose partition path are
in partitionsToUpdate
@@ -1067,7 +1088,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient,
partitionPaths);
// initialize partitions
- initializeFromFilesystem(instantTime, partitionTypes);
+ initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
/**
@@ -1248,7 +1269,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// We cannot create a deltaCommit at instantTime now because a future
(rollback) block has already been written to the logFiles.
// We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
// or a new timestamp which we use for MDT clean, compaction etc.
- String syncCommitTime = writeClient.createNewInstantTime(false);
+ String syncCommitTime = createRestoreInstantTime();
processAndCommit(syncCommitTime, () ->
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
closeInternal();
@@ -1257,6 +1278,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
+ String createRestoreInstantTime() {
+ return writeClient.createNewInstantTime(false);
+ }
+
/**
* Update from {@code HoodieRollbackMetadata}.
*
@@ -1493,12 +1518,12 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
return;
}
// Check and run clean operations.
- cleanIfNecessary(writeClient);
+ cleanIfNecessary(writeClient, lastInstant.get().requestedTime());
// Do timeline validation before scheduling compaction/logCompaction
operations.
- if (validateCompactionScheduling()) {
+ if (validateCompactionScheduling(inFlightInstantTimestamp,
lastInstant.get().requestedTime())) {
String latestDeltacommitTime = lastInstant.get().requestedTime();
LOG.info("Latest deltacommit time found is {}, running compaction
operations.", latestDeltacommitTime);
- compactIfNecessary(writeClient);
+ compactIfNecessary(writeClient, Option.of(latestDeltacommitTime));
}
writeClient.archive();
LOG.info("All the table services operations on MDT completed
successfully");
@@ -1552,7 +1577,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
* deltacommit.
*/
- protected void compactIfNecessary(BaseHoodieWriteClient writeClient) {
+ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,?> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
// IMPORTANT: Trigger compaction with max instant time that is smaller
than(or equals) the earliest pending instant from DT.
// The compaction planner will manage to filter out the log files that
finished with greater completion time.
// see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more
details.
@@ -1586,7 +1611,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
- protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) {
+ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String
instantTime) {
Option<HoodieInstant> lastCompletedCompactionInstant =
metadataMetaClient.getActiveTimeline()
.getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant();
if (lastCompletedCompactionInstant.isPresent()
@@ -1603,14 +1628,18 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
// Trigger cleaning with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
- writeClient.clean(metadataMetaClient.createNewInstantTime(false));
+ writeClient.clean(createCleanInstantTime(instantTime));
writeClient.lazyRollbackFailedIndexing();
}
+ String createCleanInstantTime(String instantTime) {
+ return metadataMetaClient.createNewInstantTime(false);
+ }
+
/**
* Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
*/
- protected boolean validateCompactionScheduling() {
+ boolean validateCompactionScheduling(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
// Under the log compaction scope, the sequence of the log-compaction and
compaction needs to be ensured because metadata items such as RLI
// only has proc-time ordering semantics. For "ensured", it means the
completion sequence of the log-compaction/compaction is the same as the start
sequence.
if (metadataWriteConfig.isLogCompactionEnabled()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
new file mode 100644
index 00000000000..33e5070b8fb
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
+import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
+
+/**
+ * HoodieBackedTableMetadataWriter for tables with version 6. The class
derives most of the functionality from HoodieBackedTableMetadataWriter
+ * and overrides some behaviour to make it compatible for version 6.
+ */
+public abstract class HoodieBackedTableMetadataWriterTableVersionSix<I>
extends HoodieBackedTableMetadataWriter<I> {
+
+ private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10;
+
+ /**
+ * Hudi backed table metadata writer.
+ *
+ * @param storageConf Storage configuration to use for the
metadata writer
+ * @param writeConfig Writer config
+ * @param failedWritesCleaningPolicy Cleaning policy on failed writes
+ * @param engineContext Engine context
+ * @param inflightInstantTimestamp Timestamp of any instant in progress
+ */
+ protected
HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?>
storageConf,
+ HoodieWriteConfig
writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext
engineContext,
+ Option<String>
inflightInstantTimestamp) {
+ super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp);
+ }
+
+ @Override
+ List<MetadataPartitionType> getEnabledPartitions(TypedProperties
writeConfigProps, HoodieTableMetaClient metaClient) {
+ return MetadataPartitionType.getEnabledPartitions(writeConfigProps,
metaClient).stream()
+ .filter(partition ->
!partition.equals(MetadataPartitionType.SECONDARY_INDEX))
+ .filter(partition ->
!partition.equals(MetadataPartitionType.EXPRESSION_INDEX))
+ .filter(partition ->
!partition.equals(MetadataPartitionType.PARTITION_STATS))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants,
Option<String> inflightInstantTimestamp) {
+ if (pendingDataInstants.stream()
+ .anyMatch(i -> !inflightInstantTimestamp.isPresent() ||
!i.equals(inflightInstantTimestamp.get()))) {
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+ LOG.warn("Cannot initialize metadata table as operation(s) are in
progress on the dataset: {}",
+ Arrays.toString(pendingDataInstants.toArray()));
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ String generateUniqueInstantTime(String initializationTime) {
+ // if its initialized via Async indexer, we don't need to alter the init
time
+ HoodieTimeline dataIndexTimeline =
dataMetaClient.getActiveTimeline().filter(instant ->
instant.getAction().equals(HoodieTimeline.INDEXING_ACTION));
+ if (HoodieTableMetadataUtil.isIndexingCommit(dataIndexTimeline,
initializationTime)) {
+ return initializationTime;
+ }
+ // Add suffix to initializationTime to find an unused instant time for the
next index initialization.
+ // This function would be called multiple times in a single application if
multiple indexes are being
+ // initialized one after the other.
+ for (int offset = 0; ; ++offset) {
+ final String commitInstantTime =
createIndexInitTimestamp(initializationTime, offset);
+ if
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+ return commitInstantTime;
+ }
+ }
+ }
+
+ /**
+ * Create the timestamp for an index initialization operation on the
metadata table.
+ * <p>
+ * Since many MDT partitions can be initialized one after other the offset
parameter controls generating a
+ * unique timestamp.
+ */
+ private String createIndexInitTimestamp(String timestamp, int offset) {
+ return String.format("%s%03d", timestamp,
PARTITION_INITIALIZATION_TIME_SUFFIX + offset);
+ }
+
+ @Override
+ String getTimelineHistoryPath() {
+ return ARCHIVELOG_FOLDER.defaultValue();
+ }
+
+ /**
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
+ */
+ @Override
+ boolean validateCompactionScheduling(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
+ // Whenever you want to change this logic, please ensure all below
scenarios are considered.
+ // a. There could be a chance that latest delta commit in MDT is committed
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+ // b. There could be DT inflights after latest delta commit in MDT and we
are ok with it. bcoz, the contract is, the latest compaction instant time in
MDT represents
+ // any instants before that is already synced with metadata table.
+ // c. Do consider out of order commits. For eg, c4 from DT could complete
before c3. and we can't trigger compaction in MDT with c4 as base instant time,
until every
+ // instant before c4 is synced with metadata table.
+ List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+
+ if (!pendingInstants.isEmpty()) {
+ checkNumDeltaCommits(metadataMetaClient,
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+ LOG.info(String.format(
+ "Cannot compact metadata table as there are %d inflight instants in
data table before latest deltacommit in metadata table: %s. Inflight instants
in data table: %s",
+ pendingInstants.size(), latestDeltaCommitTimeInMetadataTable,
Arrays.toString(pendingInstants.toArray())));
+ return false;
+ }
+
+ // Check if there are any pending compaction or log compaction instants in
the timeline.
+ // If pending compact/logCompaction operations are found abort scheduling
new compaction/logCompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.warn(String.format("Not scheduling compaction or logCompaction,
since a pending compaction instant %s or logCompaction %s instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant));
+ return false;
+ }
+ return true;
+ }
+
+ private void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int
maxNumDeltaCommitsWhenPending) {
+ final HoodieActiveTimeline activeTimeline =
metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> lastCompaction =
activeTimeline.filterCompletedInstants()
+ .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+ int numDeltaCommits = lastCompaction.isPresent()
+ ?
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants()
+ : activeTimeline.getDeltaCommitTimeline().countInstants();
+ if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+ throw new HoodieMetadataException(String.format("Metadata table's
deltacommits exceeded %d: "
+ + "this is likely caused by a pending instant in the data table.
Resolve the pending instant "
+ + "or adjust `%s`, then restart the pipeline.",
+ maxNumDeltaCommitsWhenPending,
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+ }
+ }
+
+ /**
+ * Update from {@code HoodieRollbackMetadata}.
+ *
+ * @param rollbackMetadata {@code HoodieRollbackMetadata}
+ * @param instantTime Timestamp at which the rollback was performed
+ */
+ @Override
+ public void update(HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
+ if (initialized && metadata != null) {
+ // The commit which is being rolled back on the dataset
+ final String commitToRollbackInstantTime =
rollbackMetadata.getCommitsRollback().get(0);
+ // Find the deltacommits since the last compaction
+ Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+
+ // This could be a compaction or deltacommit instant (See
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+ HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+ HoodieTimeline deltacommitsSinceCompaction =
deltaCommitsInfo.get().getKey();
+
+ // The deltacommit that will be rolled back
+ HoodieInstant deltaCommitInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime,
+ InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+
+ validateRollbackVersionSix(commitToRollbackInstantTime,
compactionInstant, deltacommitsSinceCompaction);
+
+ // lets apply a delta commit with DT's rb instant containing following
records:
+ // a. any log files as part of RB commit metadata that was added
+ // b. log files added by the commit in DT being rolled back. By rolled
back, we mean, a rollback block will be added and does not mean it will be
deleted.
+ // both above list should only be added to FILES partition.
+ processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient,
rollbackMetadata, instantTime));
+
+ String rollbackInstantTime = createRollbackTimestamp(instantTime);
+ if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
+ LOG.info("Rolling back MDT deltacommit " +
commitToRollbackInstantTime);
+ if (!getWriteClient().rollback(commitToRollbackInstantTime,
rollbackInstantTime)) {
+ throw new HoodieMetadataException("Failed to rollback deltacommit at
" + commitToRollbackInstantTime);
+ }
+ } else {
+ LOG.info("Ignoring rollback of instant {} at {}. The commit to
rollback is not found in MDT",
+ commitToRollbackInstantTime, instantTime);
+ }
+ closeInternal();
+ }
+ }
+
+ private void validateRollbackVersionSix(
+ String commitToRollbackInstantTime,
+ HoodieInstant compactionInstant,
+ HoodieTimeline deltacommitsSinceCompaction) {
+ // The commit being rolled back should not be earlier than the latest
compaction on the MDT. Compaction on MDT only occurs when all actions
+ // are completed on the dataset. Hence, this case implies a rollback of
completed commit which should actually be handled using restore.
+ if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+ final String compactionInstantTime = compactionInstant.requestedTime();
+ if (compareTimestamps(commitToRollbackInstantTime,
LESSER_THAN_OR_EQUALS, compactionInstantTime)) {
+ throw new HoodieMetadataException(String.format("Commit being rolled
back %s is earlier than the latest compaction %s. "
+ + "There are %d deltacommits after this compaction: %s",
commitToRollbackInstantTime, compactionInstantTime,
+ deltacommitsSinceCompaction.countInstants(),
deltacommitsSinceCompaction.getInstants()));
+ }
+ }
+ }
+
+ /**
+ * Perform a compaction on the Metadata Table.
+ * <p>
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight operations
on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
+ * <p>
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
+ */
+ @Override
+ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,?> writeClient,
Option<String> latestDeltaCommitTimeOpt) {
+ // Trigger compaction with suffixes based on the same instant time. This
ensures that any future
+ // delta commits synced over will not have an instant time lesser than the
last completed instant on the
+ // metadata table.
+ final String compactionInstantTime =
createCompactionTimestamp(latestDeltaCommitTimeOpt.get());
+
+ // we need to avoid checking compaction w/ same instant again.
+ // let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
+ // and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
+ // and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with suffixes based on the same
instant time. This ensures that any future
+ // delta commits synced over will not have an instant time lesser than
the last completed instant on the
+ // metadata table.
+ final String logCompactionInstantTime =
createLogCompactionTimestamp(latestDeltaCommitTimeOpt.get());
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
{
+ LOG.info("Log compaction with same {} time is already present in the
timeline.", logCompactionInstantTime);
+ } else if
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime,
Option.empty())) {
+ LOG.info("Log compaction is scheduled for timestamp {}",
logCompactionInstantTime);
+ writeClient.logCompact(logCompactionInstantTime);
+ }
+ }
+ }
+
+ @Override
+ String createCleanInstantTime(String instantTime) {
+ return createCleanTimestamp(instantTime);
+ }
+
+ @Override
+ String createRestoreInstantTime() {
+ return createRestoreTimestamp(writeClient.createNewInstantTime(false));
+ }
+
+ /**
+ * Create the timestamp for a compaction operation on the metadata table.
+ */
+ private String createCompactionTimestamp(String timestamp) {
+ return timestamp + getCompactionOperationSuffix();
+ }
+
+ /**
+ * Create the timestamp for a compaction operation on the metadata table.
+ */
+ private String createLogCompactionTimestamp(String timestamp) {
+ return timestamp + getLogCompactionOperationSuffix();
+ }
+
+ private String createRollbackTimestamp(String timestamp) {
+ return timestamp + getRollbackOperationSuffix();
+ }
+
+ private String createCleanTimestamp(String timestamp) {
+ return timestamp + getCleanOperationSuffix();
+ }
+
+ private String createRestoreTimestamp(String timestamp) {
+ return timestamp + getRestoreOperationSuffix();
+ }
+
+ private String getCompactionOperationSuffix() {
+ return "001";
+ }
+
+ private String getLogCompactionOperationSuffix() {
+ return "005";
+ }
+
+ private String getRollbackOperationSuffix() {
+ return "006";
+ }
+
+ private String getCleanOperationSuffix() {
+ return "002";
+ }
+
+ private String getRestoreOperationSuffix() {
+ return "003";
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 025a0edf9fa..0146a2baf1b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH;
+
/**
* Wrapper for metrics-related operations.
*/
@@ -454,7 +456,12 @@ public class HoodieMetrics {
HoodieTimeline filteredInstants =
activeTimeline.filterCompletedInstants().filter(instant ->
validActions.contains(instant.getAction()));
Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant();
if (hoodieInstantOption.isPresent()) {
- updateMetric(action, metricName,
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+ String requestedTime = hoodieInstantOption.get().requestedTime();
+ if (hoodieInstantOption.get().requestedTime().length() >
MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) {
+ // If requested instant is in MDT with table version six, it can
contain suffix
+ requestedTime = requestedTime.substring(0, requestedTime.length() - 3);
+ }
+ updateMetric(action, metricName, Long.parseLong(requestedTime));
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index ad7aca9ec6a..a88a8fa1073 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -133,7 +133,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
}
- compactIfNecessary(writeClient);
+ compactIfNecessary(writeClient, Option.empty());
if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
@@ -170,7 +170,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
// reload timeline
metadataMetaClient.reloadActiveTimeline();
- cleanIfNecessary(writeClient);
+ cleanIfNecessary(writeClient, "");
writeClient.archive();
// Update total size of the metadata and count of base/log files
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 58b2d98ffd7..2331596b0eb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -42,7 +42,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -300,8 +300,8 @@ public class SparkRDDWriteClient<T> extends
metrics.emitMetadataEnablementMetrics(true, isMetadataColStatsAvailable,
isMetadataBloomFilterAvailable, isMetadataRliAvailable);
}
- try (HoodieTableMetadataWriter writer =
SparkHoodieBackedTableMetadataWriter.create(
- context.getStorageConf(), config, context, inFlightInstantTimestamp)) {
+ try (HoodieTableMetadataWriter writer = SparkMetadataWriterFactory.create(
+ context.getStorageConf(), config, context, inFlightInstantTimestamp,
tableConfig)) {
if (writer.isInitialized()) {
writer.performTableServices(inFlightInstantTimestamp);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
new file mode 100644
index 00000000000..81df778d2dd
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieSparkIndexClient;
+import org.apache.hudi.metrics.DistributedRegistry;
+import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
+
+public class SparkHoodieBackedTableMetadataWriterTableVersionSix extends
HoodieBackedTableMetadataWriterTableVersionSix<JavaRDD<HoodieRecord>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkHoodieBackedTableMetadataWriter.class);
+
+ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
+ HoodieWriteConfig writeConfig,
+ HoodieEngineContext context,
+ Option<String>
inflightInstantTimestamp) {
+ return new SparkHoodieBackedTableMetadataWriterTableVersionSix(
+ conf, writeConfig, EAGER, context, inflightInstantTimestamp);
+ }
+
+ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
+ HoodieWriteConfig writeConfig,
+
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+ HoodieEngineContext context,
+ Option<String>
inflightInstantTimestamp) {
+ return new SparkHoodieBackedTableMetadataWriterTableVersionSix(
+ conf, writeConfig, failedWritesCleaningPolicy, context,
inflightInstantTimestamp);
+ }
+
+ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig,
+ HoodieEngineContext context) {
+ return create(conf, writeConfig, context, Option.empty());
+ }
+
+ SparkHoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?>
hadoopConf,
+ HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
+ HoodieEngineContext engineContext,
+ Option<String>
inflightInstantTimestamp) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp);
+ }
+
+ @Override
+ protected void initRegistry() {
+ if (metadataWriteConfig.isMetricsOn()) {
+ Registry registry;
+ if (metadataWriteConfig.isExecutorMetricsEnabled() &&
metadataWriteConfig.getMetricsReporterType() != MetricsReporterType.INMEMORY) {
+ registry = Registry.getRegistry("HoodieMetadata",
DistributedRegistry.class.getName());
+ HoodieSparkEngineContext sparkEngineContext =
(HoodieSparkEngineContext) engineContext;
+ ((DistributedRegistry)
registry).register(sparkEngineContext.getJavaSparkContext());
+ } else {
+ registry = Registry.getRegistry("HoodieMetadata");
+ }
+ this.metrics = Option.of(new
HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(),
dataMetaClient.getStorage()));
+ } else {
+ this.metrics = Option.empty();
+ }
+ }
+
+ @Override
+ protected void commit(String instantTime, Map<String,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ commitInternal(instantTime, partitionRecordsMap, false, Option.empty());
+ }
+
+ @Override
+ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records) {
+ return HoodieJavaRDD.getJavaRDD(records);
+ }
+
+ @Override
+ protected void bulkCommit(
+ String instantTime, String partitionName, HoodieData<HoodieRecord>
records,
+ int fileGroupCount) {
+ SparkHoodieMetadataBulkInsertPartitioner partitioner = new
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);
+ commitInternal(instantTime, Collections.singletonMap(partitionName,
records), true, Option.of(partitioner));
+ }
+
+ @Override
+ public void deletePartitions(String instantTime, List<MetadataPartitionType>
partitions) {
+ List<String> partitionsToDrop =
partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
+ LOG.info("Deleting Metadata Table partitions: {}", partitionsToDrop);
+
+ SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
+ String actionType =
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION,
HoodieTableType.MERGE_ON_READ);
+ writeClient.startCommitWithTime(instantTime, actionType);
+ writeClient.deletePartitions(partitionsToDrop, instantTime);
+ }
+
+ @Override
+ protected HoodieTable getTable(HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
+ return HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+ }
+
+ @Override
+ public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?>
initializeWriteClient() {
+ return new SparkRDDWriteClient(engineContext, metadataWriteConfig,
Option.empty());
+ }
+
+ @Override
+ protected EngineType getEngineType() {
+ return EngineType.SPARK;
+ }
+
+ @Override
+ protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex)
{
+ new HoodieSparkIndexClient(dataWriteConfig,
engineContext).createOrUpdateColumnStatsIndexDefinition(dataMetaClient,
columnsToIndex);
+ }
+
+ @Override
+ protected HoodieData<HoodieRecord>
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
+
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema,
StorageConfiguration<?> storageConf, String instantTime) {
+ throw new HoodieNotSupportedException("Expression index not supported for
Java metadata table writer yet.");
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
new file mode 100644
index 00000000000..3c6c9c4c1c3
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StorageConfiguration;
+
+/**
+ * Factory class for generating SparkHoodieBackedTableMetadataWriter based on
the table version.
+ */
+public class SparkMetadataWriterFactory {
+
+ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig, HoodieEngineContext context,
+ Option<String>
inflightInstantTimestamp, HoodieTableConfig tableConfig) {
+ if (tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
+ return SparkHoodieBackedTableMetadataWriterTableVersionSix.create(conf,
writeConfig, context, inflightInstantTimestamp);
+ } else {
+ return SparkHoodieBackedTableMetadataWriter.create(conf, writeConfig,
context, inflightInstantTimestamp);
+ }
+ }
+
+ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
+ HoodieEngineContext context,
Option<String> inflightInstantTimestamp, HoodieTableConfig tableConfig) {
+ if (tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
+ return new SparkHoodieBackedTableMetadataWriterTableVersionSix(conf,
writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp);
+ } else {
+ return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig,
failedWritesCleaningPolicy, context, inflightInstantTimestamp);
+ }
+ }
+
+ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieTableConfig
tableConfig) {
+ return create(conf, writeConfig, context, Option.empty(), tableConfig);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 0ff6d8ab551..ac6428350dc 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -36,7 +36,7 @@ import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.hadoop.conf.Configuration;
@@ -105,9 +105,9 @@ public abstract class HoodieSparkTable<T>
// Create the metadata table writer. First time after the upgrade this
creation might trigger
// metadata table bootstrapping. Bootstrapping process could fail and
checking the table
// existence after the creation is needed.
- HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(
+ HoodieTableMetadataWriter metadataWriter =
SparkMetadataWriterFactory.create(
getContext().getStorageConf(), config, failedWritesCleaningPolicy,
getContext(),
- Option.of(triggeringInstantTimestamp));
+ Option.of(triggeringInstantTimestamp), metaClient.getTableConfig());
try {
if (isMetadataTableExists || metaClient.getStorage().exists(
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 0fabbe27e15..67d633f8fa8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -27,11 +27,13 @@ import
org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -59,8 +61,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,15 +102,22 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieBackedTableMetadata.class);
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testTableOperations(boolean reuseReaders) throws Exception {
+ @CsvSource({"true,6", "true,8", "false,6", "false,8"})
+ public void testTableOperations(boolean reuseReaders, int tableVersion)
throws Exception {
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
- init(tableType);
+ initPath();
+ HoodieWriteConfig config =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true,
false, true, false)
+ .build();
+ config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
+ init(tableType, config);
doWriteInsertAndUpsert(testTable);
// trigger an upsert
doWriteOperation(testTable, "0000003");
verifyBaseMetadataTable(reuseReaders);
+ HoodieTableVersion finalTableVersion =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ .getTableConfig().getTableVersion();
+ assertEquals(tableVersion, finalTableVersion.versionCode());
}
/**
@@ -119,11 +127,15 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
* @throws Exception
*/
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse)
throws Exception {
+ @CsvSource({"true,6", "true,8", "false,6", "false,8"})
+ public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse, int
tableVersion) throws Exception {
final int taskNumber = 3;
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
- init(tableType);
+ initPath();
+ HoodieWriteConfig config =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true,
false, true, false)
+ .build();
+ config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
+ init(tableType, config);
testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
HoodieBackedTableMetadata tableMetadata = new
HoodieBackedTableMetadata(context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath(), reuse);
assertTrue(tableMetadata.enabled());
@@ -161,6 +173,11 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
executors.awaitTermination(5, TimeUnit.MINUTES);
assertFalse(flag.get());
assertEquals(filesNumber.get(), taskNumber);
+
+ // validate table version
+ HoodieTableVersion finalTableVersion =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ .getTableConfig().getTableVersion();
+ assertEquals(tableVersion, finalTableVersion.versionCode());
}
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws
Exception {
@@ -207,29 +224,47 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
* the right key generator class name.
*/
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testMetadataTableKeyGenerator(final HoodieTableType tableType)
throws Exception {
- init(tableType);
+ @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6",
"MERGE_ON_READ,8"})
+ public void testMetadataTableKeyGenerator(final HoodieTableType tableType,
int tableVersion) throws Exception {
+ initPath();
+ HoodieWriteConfig config =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true,
false, true, false)
+ .build();
+ config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
+ init(tableType, config);
HoodieBackedTableMetadata tableMetadata = new
HoodieBackedTableMetadata(context,
storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(),
false);
assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(),
tableMetadata.getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName());
+
+ // validate table version
+ HoodieTableVersion finalTableVersion =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ .getTableConfig().getTableVersion();
+ assertEquals(tableVersion, finalTableVersion.versionCode());
}
/**
* [HUDI-2852] Table metadata returns empty for non-exist partition.
*/
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testNotExistPartition(final HoodieTableType tableType) throws
Exception {
- init(tableType);
+ @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6",
"MERGE_ON_READ,8"})
+ public void testNotExistPartition(final HoodieTableType tableType, int
tableVersion) throws Exception {
+ initPath();
+ HoodieWriteConfig config =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true,
false, true, false)
+ .build();
+ config.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
+ init(tableType, config);
HoodieBackedTableMetadata tableMetadata = new
HoodieBackedTableMetadata(context,
storage, writeConfig.getMetadataConfig(), writeConfig.getBasePath(),
false);
List<StoragePathInfo> allFilesInPartition =
tableMetadata.getAllFilesInPartition(
new StoragePath(writeConfig.getBasePath() + "dummy"));
assertEquals(allFilesInPartition.size(), 0);
+
+ // validate table version
+ HoodieTableVersion finalTableVersion =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ .getTableConfig().getTableVersion();
+ assertEquals(tableVersion, finalTableVersion.versionCode());
}
/**
@@ -240,8 +275,8 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
* 3. Verify table services like compaction benefit from record key
deduplication feature.
*/
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType
tableType) throws Exception {
+ @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6",
"MERGE_ON_READ,8"})
+ public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType
tableType, int tableVersion) throws Exception {
initPath();
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
@@ -249,6 +284,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
.withMaxNumDeltaCommitsBeforeCompaction(3)
.build())
.build();
+ writeConfig.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
init(tableType, writeConfig);
// 2nd commit
@@ -296,6 +332,11 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
}, "Metadata table should have a valid base file!");
validateMetadata(testTable);
+
+ // validate table version
+ HoodieTableVersion finalTableVersion =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ .getTableConfig().getTableVersion();
+ assertEquals(tableVersion, finalTableVersion.versionCode());
}
/**
@@ -305,8 +346,8 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
* plan has not been successfully executed before the new one is scheduled.
*/
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testRepeatedCleanActionsWithMetadataTableEnabled(final
HoodieTableType tableType) throws Exception {
+ @CsvSource({"COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6",
"MERGE_ON_READ,8"})
+ public void testRepeatedCleanActionsWithMetadataTableEnabled(final
HoodieTableType tableType, int tableVersion) throws Exception {
initPath();
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
@@ -314,6 +355,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
.withMaxNumDeltaCommitsBeforeCompaction(4)
.build())
.build();
+ writeConfig.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
init(tableType, writeConfig);
String partition = "p1";
// Simulate two bulk insert operations adding two data files in partition
"p1"
@@ -362,6 +404,11 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
assertEquals(1, getNumCompactions(metadataMetaClient));
Set<String> fileSetAfterSecondCleaning =
getFilePathsInPartition(partition);
validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning,
fileSetAfterSecondCleaning);
+
+ // validate table version
+ HoodieTableVersion finalTableVersion =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ .getTableConfig().getTableVersion();
+ assertEquals(tableVersion, finalTableVersion.versionCode());
}
private int getNumCompactions(HoodieTableMetaClient metaClient) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 1c7a929ee91..c15950ae22d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
@@ -103,7 +104,7 @@ public class TestHoodieMetadataBase extends
HoodieSparkClientTestHarness {
initHoodieStorage();
storage.createDirectory(new StoragePath(basePath));
initTimelineService();
- initMetaClient(tableType);
+ initMetaClient(tableType, writeConfig.map(conf ->
conf.getProps()).orElse(new TypedProperties()));
initTestDataGenerator();
metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
this.writeConfig = writeConfig.isPresent()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index ead5f77518f..cc8aad17c61 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1118,6 +1118,22 @@ public class HoodieTableMetadataUtil {
}
}
+ /**
+ * Convert rollback action metadata to metadata table records.
+ * <p>
+ * We only need to handle FILES partition here as HUDI rollbacks on MOR
table may end up adding a new log file. All other partitions
+ * are handled by actual rollback of the deltacommit which added records to
those partitions.
+ */
+ public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(
+ HoodieEngineContext engineContext, HoodieTableMetaClient
dataTableMetaClient, HoodieRollbackMetadata rollbackMetadata, String
instantTime) {
+
+ List<HoodieRecord> filesPartitionRecords =
HoodieTableMetadataUtil.convertMetadataToRollbackRecords(rollbackMetadata,
instantTime, dataTableMetaClient);
+ final HoodieData<HoodieRecord> rollbackRecordsRDD =
filesPartitionRecords.isEmpty() ? engineContext.emptyHoodieData()
+ : engineContext.parallelize(filesPartitionRecords,
filesPartitionRecords.size());
+
+ return
Collections.singletonMap(MetadataPartitionType.FILES.getPartitionPath(),
rollbackRecordsRDD);
+ }
+
/**
* Convert rollback action metadata to files partition records.
* Consider only new log files added.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 580362350d9..e55e8278708 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -233,6 +233,10 @@ public class HoodieTestUtils {
builder.setKeyGeneratorType(properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()));
}
+ if (properties.containsKey("hoodie.write.table.version")) {
+
builder.setTableVersion(Integer.parseInt(properties.getProperty("hoodie.write.table.version")));
+ }
+
String keyGen =
properties.getProperty("hoodie.datasource.write.keygenerator.class");
if (!Objects.equals(keyGen,
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
&&
!properties.containsKey("hoodie.datasource.write.partitionpath.field")) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
index 4b81abe0d70..9f3b421d68e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.util.HoodieTimer
-import org.apache.hudi.metadata.{HoodieTableMetadata,
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieTableMetadata,
SparkMetadataWriterFactory}
import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.Row
@@ -63,7 +63,7 @@ class CreateMetadataTableProcedure extends BaseProcedure with
ProcedureBuilder w
}
val timer = HoodieTimer.start
val writeConfig = getWriteConfig(basePath)
- SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf,
writeConfig, new HoodieSparkEngineContext(jsc))
+ SparkMetadataWriterFactory.create(metaClient.getStorageConf, writeConfig,
new HoodieSparkEngineContext(jsc), metaClient.getTableConfig)
Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" +
timer.endTimer / 1000.0 + "secs)"))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
index 4864a70a9ad..2687f50471d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.util.HoodieTimer
-import org.apache.hudi.metadata.{HoodieTableMetadata,
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieTableMetadata,
SparkMetadataWriterFactory}
import org.apache.hudi.storage.StoragePath
import org.apache.spark.internal.Logging
@@ -64,7 +64,7 @@ class InitMetadataTableProcedure extends BaseProcedure with
ProcedureBuilder wit
val timer = HoodieTimer.start
if (!readOnly) {
val writeConfig = getWriteConfig(basePath)
- SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf,
writeConfig, new HoodieSparkEngineContext(jsc))
+ SparkMetadataWriterFactory.create(metaClient.getStorageConf,
writeConfig, new HoodieSparkEngineContext(jsc), metaClient.getTableConfig)
}
val action = if (readOnly) "Opened" else "Initialized"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 44d08f6fb4d..191ad1abf91 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -94,12 +94,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieSparkMergeOnReadTableRollback extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws
Exception {
+ @CsvSource({"true,6", "true,8", "false,6", "false,8"})
+ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers, int
tableVersion) throws Exception {
// Set TableType to COW
- HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ Properties properties = new Properties();
+ properties.put(HoodieTableConfig.VERSION.key(),
String.valueOf(tableVersion));
+ properties.put(HoodieWriteConfig.WRITE_TABLE_VERSION.key(),
String.valueOf(tableVersion));
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers);
+ cfg.setValue(HoodieTableConfig.VERSION, String.valueOf(tableVersion));
+ cfg.setValue(HoodieWriteConfig.WRITE_TABLE_VERSION,
String.valueOf(tableVersion));
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index e1c4775f35a..17449e8d098 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -125,7 +125,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
.save(basePath)
dfList = dfList :+ inputDF
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
if (params.shouldValidateColStats) {
// Currently, routine manually validating the column stats (by actually
reading every column of every file)
@@ -292,7 +292,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
.build()
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val schemaUtil = new TableSchemaResolver(metaClient)
val tableSchema = schemaUtil.getTableAvroSchema(false)
val localSourceTableSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
@@ -442,27 +442,32 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
object ColumnStatIndexTestBase {
- case class ColumnStatsTestCase(tableType: HoodieTableType,
shouldReadInMemory: Boolean)
+ case class ColumnStatsTestCase(tableType: HoodieTableType,
shouldReadInMemory: Boolean, tableVersion: Int)
def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
{
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType
=>
- Seq(Arguments.arguments(ColumnStatsTestCase(tableType,
shouldReadInMemory = true)),
- Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false))
+ Seq(Arguments.arguments(ColumnStatsTestCase(tableType,
shouldReadInMemory = true, tableVersion = 6)),
+ Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false, tableVersion = 6)),
+ Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= true, tableVersion = 8)),
+ Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= false, tableVersion = 8))
)
): _*)
}
def testMetadataColumnStatsIndexParamsInMemory:
java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType
=>
- Seq(Arguments.arguments(ColumnStatsTestCase(tableType,
shouldReadInMemory = true))
+ Seq(Arguments.arguments(ColumnStatsTestCase(tableType,
shouldReadInMemory = true, tableVersion = 6)),
+ Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory
= true, tableVersion = 8))
)
): _*)
}
def testMetadataColumnStatsIndexParamsForMOR:
java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
-
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = true)),
- Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false))
+
Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = true, tableVersion = 6)),
+ Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false, tableVersion = 6)),
+ Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = true, tableVersion = 8)),
+ Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ,
shouldReadInMemory = false, tableVersion = 8))
)
: _*)
}
@@ -470,11 +475,19 @@ object ColumnStatIndexTestBase {
def testTableTypePartitionTypeParams: java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
Seq(
- Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8"),
+ // Table version 6
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8", "6"),
// empty partition col represents non-partitioned table.
- Arguments.arguments(HoodieTableType.COPY_ON_WRITE, ""),
- Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8"),
- Arguments.arguments(HoodieTableType.MERGE_ON_READ, "")
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "", "6"),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8", "6"),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "", "6"),
+
+ // Table version 8
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "c8", "8"),
+ // empty partition col represents non-partitioned table.
+ Arguments.arguments(HoodieTableType.COPY_ON_WRITE, "", "8"),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "c8", "8"),
+ Arguments.arguments(HoodieTableType.MERGE_ON_READ, "", "8")
)
: _*)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 35fdc8b041e..872bca957a7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -37,7 +37,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
)
- val commonOpts: Map[String, String] = Map(
+ def commonOpts: Map[String, String] = Map(
PARTITIONPATH_FIELD.key -> "partition",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15"
@@ -94,7 +94,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
if (!metaClientReloaded) {
// initialization of meta client is required again after writing data so
that
// latest table configs are picked up
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
metaClientReloaded = true
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 086443e0685..44ae5d05447 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
+import
org.apache.hudi.common.table.timeline.versioning.v1.InstantFileNameGeneratorV1
import org.apache.hudi.common.table.view.FileSystemViewManager
import org.apache.hudi.common.testutils.HoodieTestUtils
import
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
@@ -50,7 +51,7 @@ import org.apache.spark.sql.types._
import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{EnumSource, MethodSource,
ValueSource}
+import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
import java.util.Collections
import java.util.stream.Collectors
@@ -80,7 +81,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- "hoodie.compact.inline.max.delta.commits" -> "10"
+ "hoodie.compact.inline.max.delta.commits" -> "10",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase,
metadataOpts, commonOpts,
@@ -171,7 +173,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
shouldValidateColStats = false,
shouldValidateManually = false))
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
validateNonExistantColumnsToIndexDefn(metaClient)
}
@@ -194,7 +196,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
- HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
var expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
@@ -239,8 +242,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
@ParameterizedTest
@MethodSource(Array("testTableTypePartitionTypeParams"))
- def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
- val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String, tableVersion: Int): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true,
tableVersion)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false"
@@ -255,7 +258,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key -> partitionCol,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "5"
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "5",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
// inserts
@@ -305,7 +309,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
"index/colstats/mor-bootstrap1-column-stats-index-table.json"
}
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().requestedTime
// lets validate that we have log files generated in case of MOR table
@@ -355,8 +359,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
@ParameterizedTest
@MethodSource(Array("testTableTypePartitionTypeParams"))
- def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String): Unit = {
- val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ def testMetadataColumnStatsIndexInitializationWithRollbacks(tableType:
HoodieTableType, partitionCol : String, tableVersion: Int): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true,
tableVersion)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false"
@@ -371,7 +375,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> partitionCol,
"hoodie.write.markers.type" -> "DIRECT",
- HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
// inserts
@@ -412,7 +417,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
"index/colstats/mor-bootstrap-rollback1-column-stats-index-table.json"
}
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val latestCompletedCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().requestedTime
// updates a subset which are not deleted and enable col stats and
validate bootstrap
@@ -426,7 +431,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)
validateColumnsToIndex(metaClient, DEFAULT_COLUMNS_TO_INDEX)
@@ -434,7 +439,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
def simulateFailureForLatestCommit(tableType: HoodieTableType, partitionCol:
String) : Unit = {
// simulate failure for latest commit.
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
var baseFileName : String = null
var logFileName : String = null
val lastCompletedCommit =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
@@ -452,12 +457,18 @@ class TestColumnStatsIndex extends
ColumnStatIndexTestBase {
} else {
metaClient.getStorage.listFiles(new
StoragePath(metaClient.getBasePath, "9"))
}
- val baseFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(lastCompletedCommit.requestedTime)).findFirst().get()
+ val baseFileFileStatus = dataFiles.stream().filter(fileStatus =>
fileStatus.getPath.getName.contains(lastCompletedCommit.requestedTime)
+ && fileStatus.getPath.getName.contains("parquet")).findFirst().get()
baseFileName = baseFileFileStatus.getPath.getName
}
- val latestCompletedFileName =
INSTANT_FILE_NAME_GENERATOR.getFileName(lastCompletedCommit)
- metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/timeline/" +
latestCompletedFileName))
+ if
(metaClient.getTableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT))
{
+ val latestCompletedFileName = new
InstantFileNameGeneratorV1().getFileName(lastCompletedCommit)
+ metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/" +
latestCompletedFileName))
+ } else {
+ val latestCompletedFileName =
INSTANT_FILE_NAME_GENERATOR.getFileName(lastCompletedCommit)
+ metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getBasePath.toString + "/.hoodie/timeline/" +
latestCompletedFileName))
+ }
// re-create marker for the deleted file.
if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -475,11 +486,12 @@ class TestColumnStatsIndex extends
ColumnStatIndexTestBase {
}
}
- @Test
- def testMORDeleteBlocks(): Unit = {
+ @ParameterizedTest
+ @CsvSource(value = Array("6", "8"))
+ def testMORDeleteBlocks(tableVersion: Int): Unit = {
val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
val partitionCol = "c8"
- val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true,
tableVersion)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
@@ -494,7 +506,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> partitionCol,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5"
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "5",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
// inserts
@@ -533,10 +546,10 @@ class TestColumnStatsIndex extends
ColumnStatIndexTestBase {
}
@ParameterizedTest
- @ValueSource(strings = Array("", "c8"))
- def testColStatsWithCleanCOW(partitionCol: String): Unit = {
+ @CsvSource(value = Array("'',6", "'',8", "c8,6", "c8,8"))
+ def testColStatsWithCleanCOW(partitionCol: String, tableVersion: Int): Unit
= {
val tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE
- val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true,
tableVersion)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true"
)
@@ -550,7 +563,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> partitionCol,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1"
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
// inserts
@@ -598,10 +612,10 @@ class TestColumnStatsIndex extends
ColumnStatIndexTestBase {
}
@ParameterizedTest
- @ValueSource(strings = Array("", "c8"))
- def testColStatsWithCleanMOR(partitionCol: String): Unit = {
+ @CsvSource(value = Array("'',6", "'',8", "c8,6", "c8,8"))
+ def testColStatsWithCleanMOR(partitionCol: String, tableVersion: Int): Unit
= {
val tableType: HoodieTableType = HoodieTableType.MERGE_ON_READ
- val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true,
tableVersion)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true"
)
@@ -616,7 +630,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
PARTITIONPATH_FIELD.key() -> partitionCol,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1",
- HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2"
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
) ++ metadataOpts
// inserts
@@ -662,13 +677,13 @@ class TestColumnStatsIndex extends
ColumnStatIndexTestBase {
parquetMaxFileSize = 100 * 1024 * 1024,
smallFileLimit = 0))
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants()
> 0)
}
@ParameterizedTest
- @EnumSource(classOf[HoodieTableType])
- def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit
= {
+ @CsvSource(value = Array("COPY_ON_WRITE,6", "MERGE_ON_READ,6",
"COPY_ON_WRITE,8", "MERGE_ON_READ,8"))
+ def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType,
tableVersion: Int): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
@@ -681,7 +696,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
- HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
) ++ metadataOpts
val schema = StructType(StructField("c1", IntegerType, false) ::
StructField("c2", StringType, true) :: Nil)
@@ -698,7 +714,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
@@ -715,8 +731,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testMetadataColumnStatsWithFilesFilter(shouldReadInMemory: Boolean):
Unit = {
+ @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8"))
+ def testMetadataColumnStatsWithFilesFilter(shouldReadInMemory: Boolean,
tableVersion: Int): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
@@ -733,7 +749,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> "c8",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true"
+ HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
) ++ metadataOpts
val sourceJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString
@@ -752,7 +769,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
@@ -811,8 +828,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory:
Boolean): Unit = {
+ @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8"))
+ def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory:
Boolean, tableVersion: Int): Unit = {
var targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
@@ -828,7 +845,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true"
+ HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
) ++ metadataOpts
val sourceJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
@@ -847,7 +865,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
@@ -896,7 +914,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
.mode(SaveMode.Append)
.save(basePath)
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val requestedColumns =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS)
.getSourceFields.toSeq.filterNot(colName =>
colName.startsWith("_hoodie")).sorted.toSeq
@@ -925,8 +943,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def
testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory:
Boolean): Unit = {
+ @CsvSource(value = Array("true,6", "false,6", "true,8", "false,8"))
+ def
testTranslateQueryFiltersIntoColumnStatsIndexFilterExpr(shouldReadInMemory:
Boolean, tableVersion: Int): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
@@ -941,7 +959,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
- HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString
) ++ metadataOpts
val sourceJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
@@ -960,7 +979,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index e91c75f6e32..5411dc8c6c3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -123,6 +123,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
.save(basePath)
val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
deletedDf3.cache()
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
validateDataAndRecordIndices(hudiOpts, deletedDf3)
}
@@ -419,14 +420,10 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
}
@ParameterizedTest
- @CsvSource(value = Array(
- "COPY_ON_WRITE,6", "COPY_ON_WRITE,8", "MERGE_ON_READ,6", "MERGE_ON_READ,8"
- ))
- def testRLIWithDTClustering(tableType: String, tableVersion: Int): Unit = {
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIWithDTClustering(tableType: HoodieTableType): Unit = {
val hudiOpts = commonOpts ++ Map(
- DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
- HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> tableVersion.toString,
- HoodieTableConfig.VERSION.key() -> tableVersion.toString,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2"
)
@@ -434,7 +431,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
for ((k, v) <- hudiOpts) {
props.put(k, v)
}
- initMetaClient(HoodieTableType.valueOf(tableType), props)
+ initMetaClient(tableType, props)
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
@@ -455,6 +452,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
assertTrue(getLatestClusteringInstant().get().requestedTime.compareTo(lastClusteringInstant.get().requestedTime)
> 0)
assertEquals(getLatestClusteringInstant(),
metaClient.getActiveTimeline.lastInstant())
+ validateDataAndRecordIndices(hudiOpts)
// We are validating rollback of a DT clustering instant here
rollbackLastInstant(hudiOpts)
validateDataAndRecordIndices(hudiOpts)
@@ -545,6 +543,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
val metadataTableFSView = getHoodieTable(metaClient,
getWriteConfig(hudiOpts)).getMetadataTable
.asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
val compactionTimeline =
metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala
new file mode 100644
index 00000000000..71e31057b7c
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexTableVersionSix.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+
+class TestRecordLevelIndexTableVersionSix extends TestRecordLevelIndex {
+ override def commonOpts: Map[String, String] = super.commonOpts ++ Map(
+ HoodieTableConfig.VERSION.key() -> "6",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6"
+ )
+}