This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 87dd78c80de [HUDI-8343] Fix functional index update for clean
operation (#12096)
87dd78c80de is described below
commit 87dd78c80de1ebc166fa2689016f29a55cd07485
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Oct 15 19:30:44 2024 +0530
[HUDI-8343] Fix functional index update for clean operation (#12096)
---
.../client/utils/SparkMetadataWriterUtils.java | 24 ++++--
.../hudi/metadata/HoodieTableMetadataUtil.java | 40 +++++++++-
.../hudi/common/testutils/HoodieTestUtils.java | 17 ++++
.../hudi/command/index/TestFunctionalIndex.scala | 91 ++++++++++++++++++++--
4 files changed, 159 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 8e9f0696c8d..48bd0f9a64c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -161,7 +162,7 @@ public class SparkMetadataWriterUtils {
boolean isBaseFile) {
List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new
HoodieBaseFile(paths[0].toString()), metaClient, schema)
:
getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()),
metaClient, schema);
- return dropMetaFields(toDataset(records, schema, sqlContext));
+ return dropMetaFields(toDataset(records, schema, sqlContext, isBaseFile));
}
private static List<HoodieRecord> getUnmergedLogFileRecords(List<String>
logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
@@ -193,17 +194,26 @@ public class SparkMetadataWriterUtils {
}
}
- private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema
schema, SQLContext sqlContext) {
+ private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema
schema, SQLContext sqlContext, boolean isBaseFile) {
List<GenericRecord> avroRecords = records.stream()
- .map(r -> (GenericRecord) r.getData())
+ .map(r -> {
+ if (isBaseFile) {
+ return (GenericRecord) r.getData();
+ }
+ HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
+ try {
+ return (GenericRecord) payload.getInsertValue(schema).get();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to extract Avro payload", e);
+ }
+ })
.collect(Collectors.toList());
if (avroRecords.isEmpty()) {
return sqlContext.emptyDataFrame().toDF();
}
- try (JavaSparkContext jsc = new
JavaSparkContext(sqlContext.sparkContext())) {
- JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
- return AvroConversionUtils.createDataFrame(javaRDD.rdd(),
schema.toString(), sqlContext.sparkSession());
- }
+ JavaSparkContext jsc = new JavaSparkContext(sqlContext.sparkContext());
+ JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
+ return AvroConversionUtils.createDataFrame(javaRDD.rdd(),
schema.toString(), sqlContext.sparkSession());
}
private static <T extends Comparable<T>>
HoodieColumnRangeMetadata<Comparable> computeColumnRangeMetadata(Dataset<Row>
rowDataset,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 286187a38ed..30afca04e8c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -571,10 +572,47 @@ public class HoodieTableMetadataUtil {
dataMetaClient, isColumnStatsIndexEnabled,
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
metadataColumnStatsRDD);
}
+ if
(enabledPartitionTypes.contains(MetadataPartitionType.FUNCTIONAL_INDEX)) {
+ convertMetadataToFunctionalIndexRecords(engineContext, cleanMetadata,
instantTime, dataMetaClient, bloomIndexParallelism,
columnStatsIndexParallelism, partitionToRecordsMap);
+ }
return partitionToRecordsMap;
}
+ private static void
convertMetadataToFunctionalIndexRecords(HoodieEngineContext engineContext,
HoodieCleanMetadata cleanMetadata,
+ String
instantTime, HoodieTableMetaClient dataMetaClient,
+ int
bloomIndexParallelism, int columnStatsIndexParallelism,
+ Map<String,
HoodieData<HoodieRecord>> partitionToRecordsMap) {
+ Option<HoodieIndexMetadata> indexMetadata =
dataMetaClient.getIndexMetadata();
+ if (indexMetadata.isPresent()) {
+ HoodieIndexMetadata metadata = indexMetadata.get();
+ Map<String, HoodieIndexDefinition> indexDefinitions =
metadata.getIndexDefinitions();
+ if (indexDefinitions.isEmpty()) {
+ throw new HoodieMetadataException("Functional index metadata not
found");
+ }
+ // iterate over each index definition and check:
+ // if it is a functional index using column_stats, then follow the same
approach as column_stats
+ // if it is a functional index using bloom_filters, then follow the same
approach as bloom_filters
+ // else throw an exception
+ for (Map.Entry<String, HoodieIndexDefinition> entry :
indexDefinitions.entrySet()) {
+ String indexName = entry.getKey();
+ HoodieIndexDefinition indexDefinition = entry.getValue();
+ if
(MetadataPartitionType.FUNCTIONAL_INDEX.equals(MetadataPartitionType.fromPartitionPath(indexDefinition.getIndexName())))
{
+ if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
+ partitionToRecordsMap.put(indexName,
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime,
bloomIndexParallelism));
+ } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+ partitionToRecordsMap.put(indexName,
+ convertMetadataToColumnStatsRecords(cleanMetadata,
engineContext, dataMetaClient, true, columnStatsIndexParallelism,
indexDefinition.getSourceFields()));
+ } else {
+ throw new HoodieMetadataException("Unsupported functional index
type");
+ }
+ }
+ }
+ } else {
+ throw new HoodieMetadataException("Functional index metadata not found");
+ }
+ }
+
/**
* Finds all files that were deleted as part of a clean and creates metadata
table records for them.
*
@@ -1192,7 +1230,7 @@ public class HoodieTableMetadataUtil {
List<String>
columnsToIndex,
boolean isDeleted)
{
String filePartitionPath = filePath.startsWith("/") ?
filePath.substring(1) : filePath;
- String fileName = FSUtils.getFileName(filePath, partitionPath);
+ String fileName =
filePartitionPath.substring(filePartitionPath.lastIndexOf("/") + 1);
if (isDeleted) {
// TODO we should delete records instead of stubbing them
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index acee42be055..ef9a47e86ae 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.testutils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -29,7 +30,9 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
@@ -373,4 +376,18 @@ public class HoodieTestUtils {
throw new HoodieIOException("Failed to get instant file status", e);
}
}
+
+ /**
+ * Gets the pair of partition and cleaned file from the clean metadata.
+ */
+ public static List<Pair<String, String>>
getCleanedFiles(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant)
throws IOException {
+ HoodieCleanMetadata cleanMetadata =
CleanerUtils.getCleanerMetadata(metaClient, cleanInstant);
+ List<Pair<String, String>> deleteFileList = new ArrayList<>();
+ cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
+ // Files deleted from a partition
+ List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
+ deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition,
entry)));
+ });
+ return deleteFileList;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 5fb046faee4..de20cfb6fa2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.common.util.Option
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig}
import org.apache.hudi.hive.HiveSyncConfigHolder._
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
@@ -536,11 +537,6 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
Seq(3, "a2"),
Seq(4, "a2")
)
- // verify there are no new updates to functional index
- checkAnswer(metadataSql)(
- Seq("2020-09-26", "2021-09-26"),
- Seq("2022-09-26", "2022-09-26")
- )
// enable functional index
spark.sql(s"set
${HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key}=true")
@@ -557,12 +553,97 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
checkAnswer(metadataSql)(
Seq("2020-09-26", "2021-09-26"),
Seq("2022-09-26", "2022-09-26"),
+ Seq("2022-09-26", "2022-09-26"), // for file in name=a2
Seq("2024-09-26", "2024-09-26") // for file in name=a3
)
}
}
}
+ // Test functional index using column stats and bloom filters, and then
clean older version, and check index is correct.
+ test("Test Functional Index With Cleaning") {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ Seq(true, false).foreach { isPartitioned =>
+ val tableName = generateTableName +
s"_clean_$tableType$isPartitioned"
+ val partitionByClause = if (isPartitioned) "partitioned by(price)"
else ""
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts',
+ | hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+ | ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1'
+ | )
+ | $partitionByClause
+ | location '$basePath'
+ """.stripMargin)
+ if (tableType == "mor") {
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+ }
+ if (!isPartitioned) {
+ // setting this for non-partitioned table to ensure multiple
file groups are created
+ spark.sql(s"set
${HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()}=0")
+ }
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+ spark.sql(s"insert into $tableName values(1, 'a1', 1601098924,
10)")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+ spark.sql(s"insert into $tableName values(2, 'a2', 1632634924,
100)")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+ spark.sql(s"insert into $tableName values(3, 'a3', 1664170924,
1000)")
+ // create functional index
+ spark.sql(s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+ // validate index created successfully
+ val metaClient = createMetaClient(spark, basePath)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+ assertTrue(metaClient.getIndexMetadata.isPresent)
+ assertEquals(1,
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+
+ // verify functional index records by querying metadata table
+ val metadataSql = s"select
ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName')
where type=3"
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2020-09-26"), // for file in price=10
+ Seq("2021-09-26", "2021-09-26"), // for file in price=100
+ Seq("2022-09-26", "2022-09-26") // for file in price=1000
+ )
+
+ // get file name for price=1000
+ val fileNames = spark.sql(s"select ColumnStatsMetadata.fileName
from hudi_metadata('$tableName') where type=3 and
ColumnStatsMetadata.minValue.member6.value='2022-09-26'").collect()
+ assertEquals(1, fileNames.length)
+ val fileName = fileNames(0).getString(0)
+
+ // update the record with id=3
+ // produce two versions so that the older version can be cleaned
+ spark.sql(s"update $tableName set ts=1695706924 where id=3")
+ spark.sql(s"update $tableName set ts=1727329324 where id=3")
+
+ // check cleaning completed
+ val lastCleanInstant =
metaClient.reloadActiveTimeline().getCleanerTimeline.lastInstant()
+ assertTrue(lastCleanInstant.isPresent)
+ // verify that file for price=1000 is cleaned
+ assertTrue(HoodieTestUtils.getCleanedFiles(metaClient,
lastCleanInstant.get()).get(0).getValue.equals(fileName))
+
+ // verify there are new updates to functional index with isDeleted
true for cleaned file
+ checkAnswer(s"select ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value, ColumnStatsMetadata.isDeleted from
hudi_metadata('$tableName') where type=3 and
ColumnStatsMetadata.fileName='$fileName'")(
+ Seq("2022-09-26", "2022-09-26", false),
+ Seq(null, null, true) // for the cleaned file
+ )
+ }
+ }
+ }
+ }
+ }
+
private def assertTableIdentifier(catalogTable: CatalogTable,
expectedDatabaseName: String,
expectedTableName: String): Unit = {