This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b634cfc47ee0dadf5da8ae9bca42a739daccca39 Author: Y Ethan Guo <[email protected]> AuthorDate: Wed Oct 22 23:00:38 2025 +0800 fix: Handle deletes and updates properly in secondary index (#14090) --- .../metadata/HoodieBackedTableMetadataWriter.java | 34 +++-- .../hudi/metadata/MetadataIndexGenerator.java | 132 ------------------- .../apache/hudi/metadata/MetadataIndexMapper.java | 66 ++++++++++ .../apache/hudi/metadata/RecordIndexMapper.java | 84 ++++++++++++ .../apache/hudi/metadata/SecondaryIndexMapper.java | 68 ++++++++++ .../SecondaryIndexRecordGenerationUtils.java | 13 +- ...Generator.java => TestMetadataIndexMapper.java} | 13 +- .../FlinkHoodieBackedTableMetadataWriter.java | 5 - .../JavaHoodieBackedTableMetadataWriter.java | 5 - .../SparkHoodieBackedTableMetadataWriter.java | 4 - ...ieBackedTableMetadataWriterTableVersionSix.java | 5 - .../hudi/metadata/HoodieTableMetadataUtil.java | 40 ++++-- .../functional/TestSecondaryIndexPruning.scala | 142 ++++++++++++++++++++- 13 files changed, 424 insertions(+), 187 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 5d172095affd..51d5a9016081 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -180,7 +180,6 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab boolean initialized = false; private HoodieTableFileSystemView metadataView; private final boolean streamingWritesEnabled; - private final Option<MetadataIndexGenerator> metadataIndexGenerator; protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, @@ -225,18 +224,12 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab } ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT Reader should have been opened post initialization"); this.streamingWritesEnabled = streamingWritesEnabled; - this.metadataIndexGenerator = streamingWritesEnabled ? Option.of(initializeMetadataIndexGenerator()) : Option.empty(); } List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) { return MetadataPartitionType.getEnabledPartitions(metadataConfig, metaClient); } - /** - * Returns the utilities for metadata index generation. - */ - abstract MetadataIndexGenerator initializeMetadataIndexGenerator(); - private void mayBeReinitMetadataReader() { if (metadata == null || metadataMetaClient == null || metadata.getMetadataFileSystemView() == null) { initMetadataReader(); @@ -1239,12 +1232,31 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab } maybeInitializeNewFileGroupsForPartitionedRLI(writeStatus, instantTime); - HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap( - new MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag, dataWriteConfig)); + + Map<MetadataPartitionType, MetadataIndexMapper> indexMapperMap = mdtPartitionsToTag.stream() + .filter(e -> e.equals(RECORD_INDEX) || e.equals(SECONDARY_INDEX)) + .collect(Collectors.toMap( + key -> key, + key -> { + if (RECORD_INDEX.equals(key)) { + return new RecordIndexMapper(dataWriteConfig); + } + return new SecondaryIndexMapper(dataWriteConfig); + } + )); + + if (indexMapperMap.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + HoodieData<HoodieRecord> processedRecords = indexMapperMap.values().stream() + .map(indexMapper -> indexMapper.postProcess(writeStatus.flatMap(indexMapper))) + .reduce(HoodieData::union) + .get(); // tag records w/ location - Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords = tagRecordsWithLocationForStreamingWrites(untaggedRecords, - mdtPartitionPathsToTag); + Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords = + tagRecordsWithLocationForStreamingWrites(processedRecords, mdtPartitionPathsToTag); // write partial writes to MDT table (for those partitions where streaming writes are enabled) HoodieData<WriteStatus> writeStatusCollection = convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords, instantTime)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java deleted file mode 100644 index 87fbbec05a45..000000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.metadata; - -import org.apache.hudi.client.SecondaryIndexStats; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.function.SerializableFunction; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordDelegate; -import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieMetadataException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; -import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; - -/** - * For now this is a placeholder to generate all MDT records in one place. - * Once <a href="https://github.com/apache/hudi/pull/13226">Refactor MDT update logic with Indexer</a> is landed, - * we will leverage the new abstraction to generate MDT records. - */ -public class MetadataIndexGenerator implements Serializable { - - private static final Logger LOG = LoggerFactory.getLogger(MetadataIndexGenerator.class); - - /** - * MDT record transformation utility. This function is expected to be invoked from a map Partition call, where one spark task will receive - * one WriteStatus as input and the output contains prepared MDT table records for all eligible partitions that can operate on one - * WriteStatus instance only. - */ - static class WriteStatusBasedMetadataIndexMapper implements SerializableFunction<WriteStatus, Iterator<HoodieRecord>> { - private final List<MetadataPartitionType> enabledPartitionTypes; - private final HoodieWriteConfig dataWriteConfig; - - public WriteStatusBasedMetadataIndexMapper(List<MetadataPartitionType> enabledPartitionTypes, HoodieWriteConfig dataWriteConfig) { - this.enabledPartitionTypes = enabledPartitionTypes; - this.dataWriteConfig = dataWriteConfig; - } - - @Override - public Iterator<HoodieRecord> apply(WriteStatus writeStatus) throws Exception { - List<HoodieRecord> allRecords = new ArrayList<>(); - if (enabledPartitionTypes.contains(RECORD_INDEX)) { - allRecords.addAll(processWriteStatusForRLI(writeStatus, dataWriteConfig)); - } - if (enabledPartitionTypes.contains(SECONDARY_INDEX)) { - allRecords.addAll(processWriteStatusForSecondaryIndex(writeStatus)); - } - // yet to add support for more partitions. - // bloom filter - // secondary index - // expression index. - return allRecords.iterator(); - } - } - - protected static List<HoodieRecord> processWriteStatusForRLI(WriteStatus writeStatus, HoodieWriteConfig dataWriteConfig) { - List<HoodieRecord> allRecords = new ArrayList<>(); - for (HoodieRecordDelegate recordDelegate : writeStatus.getIndexStats().getWrittenRecordDelegates()) { - if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) { - if (recordDelegate.getIgnoreIndexUpdate()) { - continue; - } - HoodieRecord hoodieRecord; - Option<HoodieRecordLocation> newLocation = recordDelegate.getNewLocation(); - if (newLocation.isPresent()) { - if (recordDelegate.getCurrentLocation().isPresent()) { - // This is an update, no need to update index if the location has not changed - // newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's - // instantTime refers to the current commit which was completed. - if (!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId())) { - final String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.", - recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get()); - LOG.error(msg); - throw new HoodieMetadataException(msg); - } - // for updates, we can skip updating RLI partition in MDT - } else { - // Insert new record case - hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate( - recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(), - newLocation.get().getFileId(), newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding()); - allRecords.add(hoodieRecord); - } - } else { - // Delete existing index for a deleted record - hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(), dataWriteConfig.isPartitionedRecordIndexEnabled()); - allRecords.add(hoodieRecord); - } - } - } - return allRecords; - } - - protected static List<HoodieRecord> processWriteStatusForSecondaryIndex(WriteStatus writeStatus) { - List<HoodieRecord> secondaryIndexRecords = new ArrayList<>(writeStatus.getIndexStats().getSecondaryIndexStats().size()); - for (Map.Entry<String, List<SecondaryIndexStats>> entry : writeStatus.getIndexStats().getSecondaryIndexStats().entrySet()) { - String indexPartitionName = entry.getKey(); - List<SecondaryIndexStats> secondaryIndexStats = entry.getValue(); - for (SecondaryIndexStats stat : secondaryIndexStats) { - secondaryIndexRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(stat.getRecordKey(), stat.getSecondaryKeyValue(), indexPartitionName, stat.isDeleted())); - } - } - return secondaryIndexRecords; - } -} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexMapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexMapper.java new file mode 100644 index 000000000000..0b453de2b475 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexMapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * Abstract base class for metadata index mappers. Each index type can extend this to implement + * its own record generation logic. + */ +public abstract class MetadataIndexMapper implements SerializableFunction<WriteStatus, Iterator<HoodieRecord>>, Serializable { + protected final HoodieWriteConfig dataWriteConfig; + + public MetadataIndexMapper(HoodieWriteConfig dataWriteConfig) { + this.dataWriteConfig = dataWriteConfig; + } + + /** + * Generates metadata index records from a WriteStatus. + * + * @param writeStatus the write status to process + * @return list of metadata records + */ + protected abstract List<HoodieRecord> generateRecords(WriteStatus writeStatus); + + /** + * Post-processes the generated records. Default implementation returns records as-is. + * Subclasses can override to add deduplication, validation, or other transformations. + * + * @param records the generated records + * @return post-processed records + */ + public HoodieData<HoodieRecord> postProcess(HoodieData<HoodieRecord> records) { + return records; + } + + @Override + public final Iterator<HoodieRecord> apply(WriteStatus writeStatus) throws Exception { + return generateRecords(writeStatus).iterator(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java new file mode 100644 index 000000000000..13029bca3df9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordDelegate; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Mapper for Record Level Index (RLI). + */ +public class RecordIndexMapper extends MetadataIndexMapper { + private static final Logger LOG = LoggerFactory.getLogger(RecordIndexMapper.class); + + public RecordIndexMapper(HoodieWriteConfig dataWriteConfig) { + super(dataWriteConfig); + } + + @Override + protected List<HoodieRecord> generateRecords(WriteStatus writeStatus) { + List<HoodieRecord> allRecords = new ArrayList<>(); + for (HoodieRecordDelegate recordDelegate : writeStatus.getIndexStats().getWrittenRecordDelegates()) { + if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) { + if (recordDelegate.getIgnoreIndexUpdate()) { + continue; + } + HoodieRecord hoodieRecord; + Option<HoodieRecordLocation> newLocation = recordDelegate.getNewLocation(); + if (newLocation.isPresent()) { + if (recordDelegate.getCurrentLocation().isPresent()) { + // This is an update, no need to update index if the location has not changed + // newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's + // instantTime refers to the current commit which was completed. + if (!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId())) { + final String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.", + recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get()); + LOG.error(msg); + throw new HoodieMetadataException(msg); + } + // for updates, we can skip updating RLI partition in MDT + } else { + // Insert new record case + hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate( + recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(), + newLocation.get().getFileId(), newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding()); + allRecords.add(hoodieRecord); + } + } else { + // Delete existing index for a deleted record + hoodieRecord = HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey(), recordDelegate.getPartitionPath(), dataWriteConfig.isPartitionedRecordIndexEnabled()); + allRecords.add(hoodieRecord); + } + } + } + return allRecords; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexMapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexMapper.java new file mode 100644 index 000000000000..b05901392057 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexMapper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.client.SecondaryIndexStats; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys; + +public class SecondaryIndexMapper extends MetadataIndexMapper { + public SecondaryIndexMapper(HoodieWriteConfig dataWriteConfig) { + super(dataWriteConfig); + } + + @Override + protected List<HoodieRecord> generateRecords(WriteStatus writeStatus) { + List<HoodieRecord> secondaryIndexRecords = new ArrayList<>(writeStatus.getIndexStats().getSecondaryIndexStats().size()); + for (Map.Entry<String, List<SecondaryIndexStats>> entry : writeStatus.getIndexStats().getSecondaryIndexStats().entrySet()) { + String indexPartitionName = entry.getKey(); + List<SecondaryIndexStats> secondaryIndexStats = entry.getValue(); + for (SecondaryIndexStats stat : secondaryIndexStats) { + secondaryIndexRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(stat.getRecordKey(), stat.getSecondaryKeyValue(), indexPartitionName, stat.isDeleted())); + } + } + return secondaryIndexRecords; + } + + /** + * Post-processes secondary index records by deduplicating records with the same metadata record key. + * This handles partition path updates where the same record generates both DELETE and INSERT. + * Note that unlike record index, the record may have both partition path update and secondary key + * update; in this case, the delete record to the secondary index has to be honored. This is + * guaranteed as part of the reduceByKeys operation as in such a case, the DELETE and INSERT + * to the secondary index have different metadata record key. + */ + @Override + public HoodieData<HoodieRecord> postProcess(HoodieData<HoodieRecord> records) { + // Deduplicate by metadata record key (secondaryKey$recordKey) + // usePartitionInKey = false because SI keys are globally unique + int parallelism = Math.max( + records.getNumPartitions(), dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism()); + return reduceByKeys(records, parallelism, false); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java index 2a6599244e05..1bf39c639d9e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java @@ -84,7 +84,6 @@ public class SecondaryIndexRecordGenerationUtils { * @param instantTime instant time * @param indexDefinition secondary index definition * @param metadataConfig metadata config - * @param fsView file system view as of instant time * @param dataMetaClient data table meta client * @param engineContext engine context * @param writeConfig hoodie write config. @@ -97,8 +96,7 @@ public class SecondaryIndexRecordGenerationUtils { HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataMetaClient, HoodieEngineContext engineContext, - HoodieWriteConfig writeConfig - ) { + HoodieWriteConfig writeConfig) { TypedProperties props = writeConfig.getProps(); // Secondary index cannot support logs having inserts with current offering. So, lets validate that. if (allWriteStats.stream().anyMatch(writeStat -> { @@ -118,7 +116,7 @@ public class SecondaryIndexRecordGenerationUtils { int parallelism = Math.max(Math.min(writeStatsByFileId.size(), metadataConfig.getSecondaryIndexParallelism()), 1); ReaderContextFactory<T> readerContextFactory = engineContext.getReaderContextFactory(dataMetaClient); - return engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), parallelism).flatMap(writeStatsByFileIdEntry -> { + HoodieData<HoodieRecord> secondaryIndexRecords = engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), parallelism).flatMap(writeStatsByFileIdEntry -> { String fileId = writeStatsByFileIdEntry.getKey(); List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue(); String partition = writeStats.get(0).getPartitionPath(); @@ -199,6 +197,13 @@ public class SecondaryIndexRecordGenerationUtils { }); return records.iterator(); }); + + // Deduplicate secondary index records by grouping by the secondary index key + // (secondaryKey$recordKey). This handles the case where a record moves from one file group to + // another (partition path update), which generates both a delete (from old fileId) and an + // insert (to new fileId). Similar to how Record Level Index handles partition path update, + // we prefer non-deleted records. + return HoodieTableMetadataUtil.reduceByKeys(secondaryIndexRecords, parallelism, false); } private static TableFileSystemView.SliceView getSliceView(HoodieWriteConfig config, HoodieTableMetaClient dataMetaClient) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexMapper.java similarity index 85% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java rename to hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexMapper.java index 188e93099ffe..4fe658ce3dcc 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexMapper.java @@ -32,7 +32,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -42,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestMetadataIndexGenerator extends HoodieCommonTestHarness { +public class TestMetadataIndexMapper extends HoodieCommonTestHarness { @Test public void testRLIIndexMapperWithInsertsAndUpserts() throws Exception { @@ -66,9 +65,8 @@ public class TestMetadataIndexGenerator extends HoodieCommonTestHarness { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withPath("random") .build(); - MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper writeStatusBasedMetadataIndexMapper = new MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper( - Collections.singletonList(MetadataPartitionType.RECORD_INDEX), writeConfig); - Iterator<HoodieRecord> rliRecords = writeStatusBasedMetadataIndexMapper.apply(writeStatus); + RecordIndexMapper recordIndexMapper = new RecordIndexMapper(writeConfig); + Iterator<HoodieRecord> rliRecords = recordIndexMapper.apply(writeStatus); AtomicInteger totalRLIRecords = new AtomicInteger(); rliRecords.forEachRemaining(rliRecord -> { totalRLIRecords.getAndIncrement(); @@ -103,9 +101,8 @@ public class TestMetadataIndexGenerator extends HoodieCommonTestHarness { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withPath("random") .build(); - MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper writeStatusBasedMetadataIndexMapper = new MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper( - Collections.singletonList(MetadataPartitionType.RECORD_INDEX), writeConfig); - Iterator<HoodieRecord> rliRecords = writeStatusBasedMetadataIndexMapper.apply(writeStatus); + RecordIndexMapper recordIndexMapper = new RecordIndexMapper(writeConfig); + Iterator<HoodieRecord> rliRecords = recordIndexMapper.apply(writeStatus); AtomicInteger totalRLIRecords = new AtomicInteger(); rliRecords.forEachRemaining(rliRecord -> { totalRLIRecords.getAndIncrement(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index aa2c04db44e4..e9a947a4d5a4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -168,11 +168,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad throw new UnsupportedOperationException("Not implemented for Flink engine yet"); } - @Override - MetadataIndexGenerator initializeMetadataIndexGenerator() { - throw new UnsupportedOperationException("Streaming writes are not supported for Flink"); - } - @Override protected EngineType getEngineType() { return EngineType.FLINK; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index 16cfe2835d35..41ecca1fe7f1 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -63,11 +63,6 @@ public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetada super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp); } - @Override - MetadataIndexGenerator initializeMetadataIndexGenerator() { - throw new UnsupportedOperationException("Streaming writes are not supported for Java"); - } - public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index d36e59728e90..7b79be758776 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -288,10 +288,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad return exprIndexRecords; } - protected MetadataIndexGenerator initializeMetadataIndexGenerator() { - return new MetadataIndexGenerator(); - } - protected SparkRDDMetadataWriteClient getSparkWriteClient(Option<BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>>> writeClientOpt) { return ((SparkRDDMetadataWriteClient) writeClientOpt.orElse(getWriteClient())); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java index 587759829f83..70e3cfb7ac3b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java @@ -95,11 +95,6 @@ public class SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp); } - @Override - MetadataIndexGenerator initializeMetadataIndexGenerator() { - throw new UnsupportedOperationException("Streaming writes are not supported for Spark table version six"); - } - @Override protected void initRegistry() { if (metadataWriteConfig.isMetricsOn()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index ab999f7589ef..39cd361b7102 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1089,25 +1089,40 @@ public class HoodieTableMetadataUtil { } /** - * There are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) - * So, this method performs reduce by key to ignore the deleted entry. - * @param recordIndexRecords hoodie records after rli index lookup. - * @param parallelism parallelism to use. - * @return + * Reduces metadata table index records by their metadata record key to ensure only a single + * update per metadata record key. + * <p> + * This is critical for handling scenarios where the same metadata record key appears multiple times: + * - Partition movements: A data record moving from one partition to another generates both + * a delete entry (from old partition) and an insert entry (to new partition) + * - File group movements: A record moving from one file group to another + * - When usePartitionInKey is true: Groups by (metadata record key, partition path) tuple + * - When usePartitionInKey is false: Groups by metadata record key only + * <p> + * The reduce logic prefers non-deleted records over deleted ones to properly handle movements. + * For Record Level Index: When both records are non-deleted (indicating duplicate inserts for + * the same key), an exception is thrown since this represents an invalid state. + * For Secondary Index: When both records are non-deleted, the later record (record2) is preferred. + * + * @param metadataRecords {@link HoodieData} of metadata table index records to be reduced by keys + * @param parallelism parallelism for the reduce-by operation + * @param usePartitionInKey whether to use partition path as part of the grouping key + * (true for partitioned RLI, false for non-partitioned RLI and SI) + * @return HoodieData of deduplicated metadata records with one record per metadata record key */ @VisibleForTesting - public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism, boolean isPartitionedRLI) { - HoodiePairData<HoodieKey, HoodieRecord> recordIndexRecordsPair; - if (isPartitionedRLI) { - recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> { + public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> metadataRecords, int parallelism, boolean usePartitionInKey) { + HoodiePairData<HoodieKey, HoodieRecord> metadataRecordsPair; + if (usePartitionInKey) { + metadataRecordsPair = metadataRecords.mapToPair(r -> { String recordPartitionPath = r.isDelete(DELETE_CONTEXT, CollectionUtils.emptyProps()) ? ((EmptyHoodieRecordPayloadWithPartition) r.getData()).getPartitionPath() : ((HoodieMetadataPayload) r.getData()).getDataPartition(); return Pair.of(new HoodieKey(r.getRecordKey(), recordPartitionPath), r); }); } else { - recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> Pair.of(r.getKey(), r)); + metadataRecordsPair = metadataRecords.mapToPair(r -> Pair.of(r.getKey(), r)); } - return recordIndexRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, HoodieRecord>) (record1, record2) -> { + return metadataRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, HoodieRecord>) (record1, record2) -> { boolean isRecord1Deleted = record1.isDelete(DELETE_CONTEXT, CollectionUtils.emptyProps()); boolean isRecord2Deleted = record2.isDelete(DELETE_CONTEXT, CollectionUtils.emptyProps()); if (isRecord1Deleted && !isRecord2Deleted) { @@ -1118,7 +1133,8 @@ public class HoodieTableMetadataUtil { // let's delete just 1 of them return record1; } else { - throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " + // Both records are non-deleted + throw new HoodieIOException("Two HoodieRecord updates to the index is seen for same record key " + record2.getRecordKey() + ", record 1 : " + record1 + ", record 2 : " + record2); } }, parallelism).values(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 18cbb29163bb..2bbaf74ce588 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.types.StringType import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource} +import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource} import org.junit.jupiter.params.provider.Arguments.arguments import org.scalatest.Assertions.{assertResult, assertThrows} @@ -1767,6 +1767,146 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { ) } + /** + * Test Secondary Index with partition path update using global record index. + * This test validates that when a record moves from one partition (file group) to another + * using global index, the secondary index is correctly updated and queries work as expected. + * + * Test flow: + * 1. Create a table with global index enabled + * 2. Insert records into different partitions with a secondary index + * 3. Update partition path of a record (moving it from partition A to B) + * 4. Validate secondary index metadata is correct (no duplicates, no missing entry) + * 5. Validate query results using secondary index pruning + */ + @ParameterizedTest + @CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", "MERGE_ON_READ,true", "MERGE_ON_READ,false")) + def testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex(tableType: HoodieTableType, + enableStreamingWrite: Boolean): Unit = { + val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" else "mor" + val tableName = "test_secondary_index_with_partition_update_global_index_" + sqlTableType + "_" + enableStreamingWrite + + spark.sql( + s""" + |create table $tableName ( + | ts bigint, + | record_key_col string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey = 'record_key_col', + | type = '$sqlTableType', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col', + | hoodie.enable.data.skipping = 'true', + | hoodie.datasource.write.payload.class = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload", + | hoodie.index.type = 'RECORD_INDEX', + | hoodie.record.index.update.partition.path = 'true', + | hoodie.metadata.streaming.write.enabled = '$enableStreamingWrite' + | ) + | partitioned by (partition_key_col) + | location '$basePath' + """.stripMargin) + + spark.sql("set hoodie.parquet.small.file.limit=0") + // Insert initial records into different partitions + spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')") + spark.sql(s"insert into $tableName values(2, 'row2', 'def', 'p2')") + spark.sql(s"insert into $tableName values(3, 'row3', 'ghi', 'p2')") + spark.sql(s"insert into $tableName values(4, 'row4', 'ghi', 'p2')") + + // Create secondary index + spark.sql(s"create index idx_not_record_key_col on $tableName (not_record_key_col)") + + // Validate index created successfully + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() + assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col")) + + // Validate the secondary index records before partition update + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false), + Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), + Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), + Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row4", false) + ) + + // Validate data skipping with filters on secondary key column + spark.sql("set hoodie.metadata.enable=true") + spark.sql("set hoodie.enable.data.skipping=true") + spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict") + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'ghi'")( + Seq(3, "row3", "ghi", "p2"), + Seq(4, "row4", "ghi", "p2") + ) + + // Update partition path - move row3 from p2 to p3 using MERGE INTO + // This moves a record from file group A to file group B + spark.sql( + s""" + |MERGE INTO $tableName AS target + |USING (SELECT 3 as ts, 'row3' as record_key_col, 'ghi' as not_record_key_col, 'p3' as partition_key_col) AS source + |ON target.record_key_col = source.record_key_col + |WHEN MATCHED THEN UPDATE SET * + """.stripMargin) + + // Validate the secondary index records after partition update + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false), + Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), + Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), + Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row4", false) + ) + + // Validate data after partition update + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'ghi'")( + Seq(3, "row3", "ghi", "p3"), + Seq(4, "row4", "ghi", "p2") + ) + + // Validate all records are in correct partitions + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName order by record_key_col")( + Seq(1, "row1", "abc", "p1"), + Seq(2, "row2", "def", "p2"), + Seq(3, "row3", "ghi", "p3"), + Seq(4, "row4", "ghi", "p2") + ) + + // Update both secondary column value and partition path + // This moves a record from file group A to file group B + spark.sql( + s""" + |MERGE INTO $tableName AS target + |USING (SELECT 4 as ts, 'row4' as record_key_col, 'jkl' as not_record_key_col, 'p3' as partition_key_col) AS source + |ON target.record_key_col = source.record_key_col + |WHEN MATCHED THEN UPDATE SET * + """.stripMargin) + + checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false), + Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), + Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), + Seq(s"jkl${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row4", false) + ) + + // Validate data after partition update + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'jkl'")( + Seq(4, "row4", "jkl", "p3") + ) + + // Validate all records are in correct partitions + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName order by record_key_col")( + Seq(1, "row1", "abc", "p1"), + Seq(2, "row2", "def", "p2"), + Seq(3, "row3", "ghi", "p3"), + Seq(4, "row4", "jkl", "p3") + ) + } + private def deleteLastCompletedCommitFromTimeline(hudiOpts: Map[String, String], metaClient: HoodieTableMetaClient): Unit = { val lastInstant = metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant().get() assertTrue(hoodieStorage().deleteFile(new StoragePath(metaClient.getTimelinePath, HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(lastInstant))))
