This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b4451c05ce2 [HUDI-7582] Fix functional index lookup (#11021)
b4451c05ce2 is described below
commit b4451c05ce2a959a93eb443870005839ae52ec77
Author: bhat-vinay <[email protected]>
AuthorDate: Tue Apr 16 13:46:49 2024 +0530
[HUDI-7582] Fix functional index lookup (#11021)
The read path for functional index leads to a NPE which causes
file-pruning to be skipped entirely. This PR fixes the issue as follows:
1. Parse query filter to extrat function name and target column
2. Check functional index definiton and if there is an index, then use that
to skip files.
---------
Co-authored-by: Vinaykumar Bhat <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../HoodieSparkFunctionalIndex.java | 51 +++++++++----------
.../SparkHoodieBackedTableMetadataWriter.java | 2 +-
.../functional/TestHoodieSparkFunctionalIndex.java | 1 +
.../index/functional/HoodieFunctionalIndex.java | 25 ++++++++++
.../org/apache/hudi/FunctionalIndexSupport.scala | 58 ++++++++++++++++++++++
.../scala/org/apache/hudi/HoodieFileIndex.scala | 8 +--
.../hudi/command/index/TestFunctionalIndex.scala | 56 +++++++++++++++++++++
7 files changed, 172 insertions(+), 29 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/functional/HoodieSparkFunctionalIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndex.java
similarity index 84%
rename from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/functional/HoodieSparkFunctionalIndex.java
rename to
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndex.java
index dcc4b8f3926..5196f67baf3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/functional/HoodieSparkFunctionalIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndex.java
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.hudi.index.functional;
+package org.apache.hudi;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.index.functional.HoodieFunctionalIndex;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions;
@@ -44,69 +45,69 @@ public class HoodieSparkFunctionalIndex implements
HoodieFunctionalIndex<Column,
* NOTE: This is not an exhaustive list of spark-sql functions. Only the
common date/timestamp and string functions have been added.
* Add more functions as needed. However, keep the key should match the
exact spark-sql function name in lowercase.
*/
- private static final Map<String, SparkFunction> SPARK_FUNCTION_MAP =
CollectionUtils.createImmutableMap(
+ public static final Map<String, SparkFunction> SPARK_FUNCTION_MAP =
CollectionUtils.createImmutableMap(
// Date/Timestamp functions
- Pair.of("date_format", (columns, options) -> {
+ Pair.of(SPARK_DATE_FORMAT, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DATE_FORMAT requires 1 column");
}
return functions.date_format(columns.get(0), options.get("format"));
}),
- Pair.of("day", (columns, options) -> {
+ Pair.of(SPARK_DAY, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DAY requires 1 column");
}
return functions.dayofmonth(columns.get(0));
}),
- Pair.of("year", (columns, options) -> {
+ Pair.of(SPARK_YEAR, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("YEAR requires 1 column");
}
return functions.year(columns.get(0));
}),
- Pair.of("month", (columns, options) -> {
+ Pair.of(SPARK_MONTH, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("MONTH requires 1 column");
}
return functions.month(columns.get(0));
}),
- Pair.of("hour", (columns, options) -> {
+ Pair.of(SPARK_HOUR, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("HOUR requires 1 column");
}
return functions.hour(columns.get(0));
}),
- Pair.of("from_unixtime", (columns, options) -> {
+ Pair.of(SPARK_FROM_UNIXTIME, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("FROM_UNIXTIME requires 1
column");
}
return functions.from_unixtime(columns.get(0), options.get("format"));
}),
- Pair.of("unix_timestamp", (columns, options) -> {
+ Pair.of(SPARK_UNIX_TIMESTAMP, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("UNIX_TIMESTAMP requires 1
column");
}
return functions.unix_timestamp(columns.get(0), options.get("format"));
}),
- Pair.of("to_date", (columns, options) -> {
+ Pair.of(SPARK_TO_DATE, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("TO_DATE requires 1 column");
}
return functions.to_date(columns.get(0));
}),
- Pair.of("to_timestamp", (columns, options) -> {
+ Pair.of(SPARK_TO_TIMESTAMP, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("TO_TIMESTAMP requires 1 column");
}
return functions.to_timestamp(columns.get(0));
}),
- Pair.of("date_add", (columns, options) -> {
+ Pair.of(SPARK_DATE_ADD, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DATE_ADD requires 1 column");
}
return functions.date_add(columns.get(0),
Integer.parseInt(options.get("days")));
}),
- Pair.of("date_sub", (columns, options) -> {
+ Pair.of(SPARK_DATE_SUB, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("DATE_SUB requires 1 column");
}
@@ -114,73 +115,73 @@ public class HoodieSparkFunctionalIndex implements
HoodieFunctionalIndex<Column,
}),
// String functions
- Pair.of("concat", (columns, options) -> {
+ Pair.of(SPARK_CONCAT, (columns, options) -> {
if (columns.size() < 2) {
throw new IllegalArgumentException("CONCAT requires at least 2
columns");
}
return functions.concat(columns.toArray(new Column[0]));
}),
- Pair.of("substring", (columns, options) -> {
+ Pair.of(SPARK_SUBSTRING, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("SUBSTRING requires 1 column");
}
return functions.substring(columns.get(0),
Integer.parseInt(options.get("pos")), Integer.parseInt(options.get("len")));
}),
- Pair.of("lower", (columns, options) -> {
+ Pair.of(SPARK_LOWER, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("LOWER requires 1 column");
}
return functions.lower(columns.get(0));
}),
- Pair.of("upper", (columns, options) -> {
+ Pair.of(SPARK_UPPER, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("UPPER requires 1 column");
}
return functions.upper(columns.get(0));
}),
- Pair.of("trim", (columns, options) -> {
+ Pair.of(SPARK_TRIM, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("TRIM requires 1 column");
}
return functions.trim(columns.get(0));
}),
- Pair.of("ltrim", (columns, options) -> {
+ Pair.of(SPARK_LTRIM, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("LTRIM requires 1 column");
}
return functions.ltrim(columns.get(0));
}),
- Pair.of("rtrim", (columns, options) -> {
+ Pair.of(SPARK_RTRIM, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("RTRIM requires 1 column");
}
return functions.rtrim(columns.get(0));
}),
- Pair.of("length", (columns, options) -> {
+ Pair.of(SPARK_LENGTH, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("LENGTH requires 1 column");
}
return functions.length(columns.get(0));
}),
- Pair.of("regexp_replace", (columns, options) -> {
+ Pair.of(SPARK_REGEXP_REPLACE, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("REGEXP_REPLACE requires 1
column");
}
return functions.regexp_replace(columns.get(0),
options.get("pattern"), options.get("replacement"));
}),
- Pair.of("regexp_extract", (columns, options) -> {
+ Pair.of(SPARK_REGEXP_EXTRACT, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("REGEXP_EXTRACT requires 1
column");
}
return functions.regexp_extract(columns.get(0),
options.get("pattern"), Integer.parseInt(options.get("idx")));
}),
- Pair.of("split", (columns, options) -> {
+ Pair.of(SPARK_SPLIT, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("SPLIT requires 1 column");
}
return functions.split(columns.get(0), options.get("pattern"));
}),
- Pair.of("identity", (columns, options) -> {
+ Pair.of(SPARK_IDENTITY, (columns, options) -> {
if (columns.size() != 1) {
throw new IllegalArgumentException("IDENTITY requires 1 column");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index e02f703a8e3..4feefdc5b5a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -38,7 +38,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.functional.HoodieFunctionalIndex;
-import org.apache.hudi.index.functional.HoodieSparkFunctionalIndex;
+import org.apache.hudi.HoodieSparkFunctionalIndex;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java
index 85754fe8762..afb1419a73b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkFunctionalIndex.java
@@ -19,6 +19,7 @@
package org.apache.hudi.index.functional;
+import org.apache.hudi.HoodieSparkFunctionalIndex;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.sql.Column;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
index 2465731cc8d..0ca71cf3aac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
@@ -30,6 +30,31 @@ import java.util.List;
* @param <T> The target type after applying the transformation. Represents
the type of the indexed value.
*/
public interface HoodieFunctionalIndex<S, T> extends Serializable {
+
+ public static final String SPARK_DATE_FORMAT = "date_format";
+ public static final String SPARK_DAY = "day";
+ public static final String SPARK_MONTH = "month";
+ public static final String SPARK_YEAR = "year";
+ public static final String SPARK_HOUR = "hour";
+ public static final String SPARK_FROM_UNIXTIME = "from_unixtime";
+ public static final String SPARK_UNIX_TIMESTAMP = "unix_timestamp";
+ public static final String SPARK_TO_DATE = "to_date";
+ public static final String SPARK_TO_TIMESTAMP = "to_timestamp";
+ public static final String SPARK_DATE_ADD = "date_add";
+ public static final String SPARK_DATE_SUB = "date_sub";
+ public static final String SPARK_CONCAT = "concat";
+ public static final String SPARK_SUBSTRING = "substring";
+ public static final String SPARK_UPPER = "upper";
+ public static final String SPARK_LOWER = "lower";
+ public static final String SPARK_TRIM = "trim";
+ public static final String SPARK_LTRIM = "ltrim";
+ public static final String SPARK_RTRIM = "rtrim";
+ public static final String SPARK_LENGTH = "length";
+ public static final String SPARK_REGEXP_REPLACE = "regexp_replace";
+ public static final String SPARK_REGEXP_EXTRACT = "regexp_extract";
+ public static final String SPARK_SPLIT = "split";
+ public static final String SPARK_IDENTITY = "identity";
+
/**
* Get the name of the index.
*
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 0dbfc1193fd..035eeabb4d5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -22,6 +22,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.FileStatus
import org.apache.hudi.FunctionalIndexSupport._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -113,6 +114,63 @@ class FunctionalIndexSupport(spark: SparkSession,
.select(requiredIndexColumns: _*)
}
+ /**
+ * Searches for an index partition based on the specified index function and
target column name.
+ *
+ * This method looks up the index definitions available in the metadata of a
`metaClient` instance
+ * and attempts to find an index partition where the index function and the
source fields match
+ * the provided arguments. If a matching index definition is found, the
partition identifier for
+ * that index is returned.
+ *
+ * @param queryFilters A sequence of `Expression` objects to analyze. Each
expression should involve a single column
+ * for the method to consider it (expressions involving
multiple columns are skipped).
+ * @return An `Option` containing the index partition identifier if a
matching index definition is found.
+ * Returns `None` if no matching index definition is found.
+ */
+ def getFunctionalIndexPartition(queryFilters: Seq[Expression]):
Option[String] = {
+ val functionToColumnNames = extractSparkFunctionNames(queryFilters)
+ if (functionToColumnNames.nonEmpty) {
+ // Currently, only one functional index in the query is supported.
HUDI-7620 for supporting multiple functions.
+ checkState(functionToColumnNames.size == 1, "Currently, only one
function with functional index in the query is supported")
+ val (indexFunction, targetColumnName) = functionToColumnNames.head
+ val indexDefinitions =
metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions
+ indexDefinitions.asScala.foreach {
+ case (indexPartition, indexDefinition) =>
+ if (indexDefinition.getIndexFunction.equals(indexFunction) &&
indexDefinition.getSourceFields.contains(targetColumnName)) {
+ Option.apply(indexPartition)
+ }
+ }
+ Option.empty
+ } else {
+ Option.empty
+ }
+ }
+
+ /**
+ * Extracts mappings from function names to column names from a sequence of
expressions.
+ *
+ * This method iterates over a given sequence of Spark SQL expressions and
identifies expressions
+ * that contain function calls corresponding to keys in the
`SPARK_FUNCTION_MAP`. It supports only
+ * expressions that are simple binary expressions involving a single column.
If an expression contains
+ * one of the functions and operates on a single column, this method maps
the function name to the
+ * column name.
+ */
+ private def extractSparkFunctionNames(queryFilters: Seq[Expression]):
Map[String, String] = {
+ queryFilters.flatMap { expr =>
+ // Support only simple binary expression on single column
+ if (expr.references.size == 1) {
+ val targetColumnName = expr.references.head.name
+ // Check if the expression string contains any of the function names
+ val exprString = expr.toString
+ SPARK_FUNCTION_MAP.asScala.keys
+ .find(exprString.contains)
+ .map(functionName => functionName -> targetColumnName)
+ } else {
+ None // Skip expressions that do not match the criteria
+ }
+ }.toMap
+ }
+
def loadFunctionalIndexDataFrame(indexPartition: String,
shouldReadInMemory: Boolean): DataFrame = {
val colStatsDF = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index affed871cad..ee18abec504 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.HoodieMetadataPayload
@@ -344,15 +345,16 @@ case class HoodieFileIndex(spark: SparkSession,
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
lazy val (_, recordKeys) =
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
+ lazy val functionalIndexPartitionOpt =
functionalIndex.getFunctionalIndexPartition(queryFilters)
if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
validateConfig()
Option.empty
} else if (recordKeys.nonEmpty) {
Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(),
recordKeys))
- } else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) {
- val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+ } else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty &&
functionalIndexPartitionOpt.nonEmpty) {
val shouldReadInMemory = functionalIndex.shouldReadInMemory(this,
queryReferencedColumns)
- val indexDf = functionalIndex.loadFunctionalIndexDataFrame("",
shouldReadInMemory)
+ val indexDf =
functionalIndex.loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get,
shouldReadInMemory)
+ val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
} else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty ||
queryReferencedColumns.isEmpty) {
validateConfig()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 2a313f70461..e2a97c9adb0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -321,6 +321,62 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ test("Test Create Functional Index With Data Skipping") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq("cow").foreach { tableType =>
+ val databaseName = "default"
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(price)
+ | location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName (id, name, ts, price) values(1,
'a1', 1000, 10)")
+ spark.sql(s"insert into $tableName (id, name, ts, price) values(2,
'a2', 200000, 100)")
+ spark.sql(s"insert into $tableName (id, name, ts, price) values(3,
'a3', 2000000000, 1000)")
+
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+
+ var createIndexSql = s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
+
+ spark.sql(createIndexSql)
+ spark.sql(s"select key, type, ColumnStatsMetadata from
hudi_metadata('$tableName') where type = 3").show(false)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ assertTrue(metaClient.getFunctionalIndexMetadata.isPresent)
+ var functionalIndexMetadata =
metaClient.getFunctionalIndexMetadata.get()
+ assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+ assertEquals("func_index_idx_datestr",
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+ checkAnswer(s"select id, name, price, ts, from_unixtime(ts,
'yyyy-MM-dd') from $tableName where from_unixtime(ts, 'yyyy-MM-dd') <
'1970-01-03'")(
+ Seq(1, "a1", 10, 1000, "1970-01-01")
+ )
+ }
+ }
+ }
+ }
+
private def assertTableIdentifier(catalogTable: CatalogTable,
expectedDatabaseName: String,
expectedTableName: String): Unit = {