This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7b29afb415e [HUDI-8641] Ensure all binary expressions are correctly
parsed for data skipping using expression index (#12455)
7b29afb415e is described below
commit 7b29afb415eee721306d5c27d1ef86d0d3bae0f3
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Dec 16 19:16:40 2024 +0530
[HUDI-8641] Ensure all binary expressions are correctly parsed for data
skipping using expression index (#12455)
Different binary expressions have different options e.g. from_unixtime has
format option.
We should be able to handle these options and apply predicates
appropriately.
All Spark Functions have been converted to interface and all of them
implement
the required functions defined by interface.
The logic for verifying whether the user provided spark functions conform
to the index definition
spark function exists in
org.apache.hudi.ExpressionIndexSupport#extractQueryAndLiterals.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../index/functional/BaseHoodieIndexClient.java | 3 +-
.../apache/hudi/HoodieSparkExpressionIndex.java | 236 --------
.../client/utils/SparkMetadataWriterUtils.java | 2 +-
.../expression/ExpressionIndexSparkFunctions.java | 609 +++++++++++++++++++++
.../expression/HoodieSparkExpressionIndex.java | 75 +++
.../SparkHoodieBackedTableMetadataWriter.java | 4 +-
.../TestHoodieSparkExpressionIndex.java | 3 +-
.../hudi/common/model/HoodieIndexDefinition.java | 44 ++
.../HoodieExpressionIndex.java | 33 +-
.../common/table/TestHoodieTableMetaClient.java | 3 +-
.../org/apache/hudi/HoodieSparkIndexClient.java | 13 +-
.../org/apache/hudi/ExpressionIndexSupport.scala | 104 ++--
.../org/apache/hudi/RecordLevelIndexSupport.scala | 14 +-
.../spark/sql/hudi/command/IndexCommands.scala | 15 +-
.../hudi/command/index/TestExpressionIndex.scala | 292 +++++++++-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 27 +-
16 files changed, 1117 insertions(+), 360 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
index fe6c17a32ce..2fb4717d8a5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java
@@ -59,7 +59,8 @@ public abstract class BaseHoodieIndexClient {
/**
* Create a expression index.
*/
- public abstract void create(HoodieTableMetaClient metaClient, String
indexName, String indexType, Map<String, Map<String, String>> columns,
Map<String, String> options) throws Exception;
+ public abstract void create(HoodieTableMetaClient metaClient, String
indexName, String indexType, Map<String, Map<String, String>> columns,
Map<String, String> options,
+ Map<String, String> tableProperties) throws
Exception;
/**
* Drop an index. By default, ignore drop if index does not exist.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkExpressionIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkExpressionIndex.java
deleted file mode 100644
index 27fe906c605..00000000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkExpressionIndex.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.index.functional.HoodieExpressionIndex;
-
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.functions;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class HoodieSparkExpressionIndex implements
HoodieExpressionIndex<Column, Column>, Serializable {
-
- /**
- * Custom interface to support Spark functions
- */
- @FunctionalInterface
- interface SparkFunction extends Serializable {
- Column apply(List<Column> columns, Map<String, String> options);
- }
-
- /**
- * Map of Spark functions to their implementations.
- * NOTE: This is not an exhaustive list of spark-sql functions. Only the
common date/timestamp and string functions have been added.
- * Add more functions as needed. However, keep the key should match the
exact spark-sql function name in lowercase.
- */
- public static final Map<String, SparkFunction> SPARK_FUNCTION_MAP =
CollectionUtils.createImmutableMap(
- // Date/Timestamp functions
- Pair.of(SPARK_DATE_FORMAT, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("DATE_FORMAT requires 1 column");
- }
- return functions.date_format(columns.get(0), options.get("format"));
- }),
- Pair.of(SPARK_DAY, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("DAY requires 1 column");
- }
- return functions.dayofmonth(columns.get(0));
- }),
- Pair.of(SPARK_YEAR, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("YEAR requires 1 column");
- }
- return functions.year(columns.get(0));
- }),
- Pair.of(SPARK_MONTH, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("MONTH requires 1 column");
- }
- return functions.month(columns.get(0));
- }),
- Pair.of(SPARK_HOUR, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("HOUR requires 1 column");
- }
- return functions.hour(columns.get(0));
- }),
- Pair.of(SPARK_FROM_UNIXTIME, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("FROM_UNIXTIME requires 1
column");
- }
- return functions.from_unixtime(columns.get(0), options.get("format"));
- }),
- Pair.of(SPARK_UNIX_TIMESTAMP, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("UNIX_TIMESTAMP requires 1
column");
- }
- return functions.unix_timestamp(columns.get(0), options.get("format"));
- }),
- Pair.of(SPARK_TO_DATE, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("TO_DATE requires 1 column");
- }
- return functions.to_date(columns.get(0));
- }),
- Pair.of(SPARK_TO_TIMESTAMP, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("TO_TIMESTAMP requires 1 column");
- }
- return functions.to_timestamp(columns.get(0));
- }),
- Pair.of(SPARK_DATE_ADD, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("DATE_ADD requires 1 column");
- }
- return functions.date_add(columns.get(0),
Integer.parseInt(options.get("days")));
- }),
- Pair.of(SPARK_DATE_SUB, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("DATE_SUB requires 1 column");
- }
- return functions.date_sub(columns.get(0),
Integer.parseInt(options.get("days")));
- }),
-
- // String functions
- Pair.of(SPARK_CONCAT, (columns, options) -> {
- if (columns.size() < 2) {
- throw new IllegalArgumentException("CONCAT requires at least 2
columns");
- }
- return functions.concat(columns.toArray(new Column[0]));
- }),
- Pair.of(SPARK_SUBSTRING, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("SUBSTRING requires 1 column");
- }
- return functions.substring(columns.get(0),
Integer.parseInt(options.get("pos")), Integer.parseInt(options.get("len")));
- }),
- Pair.of(SPARK_LOWER, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("LOWER requires 1 column");
- }
- return functions.lower(columns.get(0));
- }),
- Pair.of(SPARK_UPPER, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("UPPER requires 1 column");
- }
- return functions.upper(columns.get(0));
- }),
- Pair.of(SPARK_TRIM, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("TRIM requires 1 column");
- }
- return functions.trim(columns.get(0));
- }),
- Pair.of(SPARK_LTRIM, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("LTRIM requires 1 column");
- }
- return functions.ltrim(columns.get(0));
- }),
- Pair.of(SPARK_RTRIM, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("RTRIM requires 1 column");
- }
- return functions.rtrim(columns.get(0));
- }),
- Pair.of(SPARK_LENGTH, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("LENGTH requires 1 column");
- }
- return functions.length(columns.get(0));
- }),
- Pair.of(SPARK_REGEXP_REPLACE, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("REGEXP_REPLACE requires 1
column");
- }
- return functions.regexp_replace(columns.get(0),
options.get("pattern"), options.get("replacement"));
- }),
- Pair.of(SPARK_REGEXP_EXTRACT, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("REGEXP_EXTRACT requires 1
column");
- }
- return functions.regexp_extract(columns.get(0),
options.get("pattern"), Integer.parseInt(options.get("idx")));
- }),
- Pair.of(SPARK_SPLIT, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("SPLIT requires 1 column");
- }
- return functions.split(columns.get(0), options.get("pattern"));
- }),
- Pair.of(IDENTITY_FUNCTION, (columns, options) -> {
- if (columns.size() != 1) {
- throw new IllegalArgumentException("IDENTITY requires 1 column");
- }
- return columns.get(0);
- })
- );
-
- private String indexName;
- private String indexFunction;
- private List<String> orderedSourceFields;
- private Map<String, String> options;
- private SparkFunction sparkFunction;
-
- public HoodieSparkExpressionIndex() {
- }
-
- public HoodieSparkExpressionIndex(String indexName, String indexFunction,
List<String> orderedSourceFields, Map<String, String> options) {
- this.indexName = indexName;
- this.indexFunction = indexFunction;
- this.orderedSourceFields = orderedSourceFields;
- this.options = options;
-
- // Check if the function from the expression exists in our map
- this.sparkFunction = SPARK_FUNCTION_MAP.get(indexFunction);
- if (this.sparkFunction == null) {
- throw new IllegalArgumentException("Unsupported Spark function: " +
indexFunction);
- }
- }
-
- @Override
- public String getIndexName() {
- return indexName;
- }
-
- @Override
- public String getIndexFunction() {
- return indexFunction;
- }
-
- @Override
- public List<String> getOrderedSourceFields() {
- return orderedSourceFields;
- }
-
- @Override
- public Column apply(List<Column> orderedSourceValues) {
- if (orderedSourceValues.size() != orderedSourceFields.size()) {
- throw new IllegalArgumentException("Mismatch in number of source values
and fields in the expression");
- }
- return sparkFunction.apply(orderedSourceValues, options);
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index b19c3744628..eb466ee5252 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -37,7 +37,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.index.functional.HoodieExpressionIndex;
+import org.apache.hudi.index.expression.HoodieExpressionIndex;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
new file mode 100644
index 00000000000..c8026f8d30a
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/ExpressionIndexSparkFunctions.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.expression;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.functions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.DAYS_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.FORMAT_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.LENGTH_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.PATTERN_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.POSITION_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.REGEX_GROUP_INDEX_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.REPLACEMENT_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.TRIM_STRING_OPTION;
+
+public class ExpressionIndexSparkFunctions {
+
+ private static final String SPARK_DATE_FORMAT = "date_format";
+ private static final String SPARK_DAY = "day";
+ private static final String SPARK_MONTH = "month";
+ private static final String SPARK_YEAR = "year";
+ private static final String SPARK_HOUR = "hour";
+ private static final String SPARK_FROM_UNIXTIME = "from_unixtime";
+ private static final String SPARK_UNIX_TIMESTAMP = "unix_timestamp";
+ private static final String SPARK_TO_DATE = "to_date";
+ private static final String SPARK_TO_TIMESTAMP = "to_timestamp";
+ private static final String SPARK_DATE_ADD = "date_add";
+ private static final String SPARK_DATE_SUB = "date_sub";
+ private static final String SPARK_SUBSTRING = "substring";
+ private static final String SPARK_UPPER = "upper";
+ private static final String SPARK_LOWER = "lower";
+ private static final String SPARK_TRIM = "trim";
+ private static final String SPARK_LTRIM = "ltrim";
+ private static final String SPARK_RTRIM = "rtrim";
+ private static final String SPARK_LENGTH = "length";
+ private static final String SPARK_REGEXP_REPLACE = "regexp_replace";
+ private static final String SPARK_REGEXP_EXTRACT = "regexp_extract";
+ private static final String SPARK_SPLIT = "split";
+ public static final String IDENTITY_FUNCTION = "identity";
+
+ private static final Map<String, SparkFunction> SPARK_FUNCTION_MAP = new
HashMap<>();
+ static {
+ SPARK_FUNCTION_MAP.put(IDENTITY_FUNCTION, new SparkIdentityFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_SPLIT, new SparkSplitFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_REGEXP_EXTRACT, new
SparkRegexExtractFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_REGEXP_REPLACE, new
SparkRegexReplaceFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_LENGTH, new SparkLengthFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_RTRIM, new SparkRTrimFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_LTRIM, new SparkLTrimFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_TRIM, new SparkTrimFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_LOWER, new SparkLowerFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_UPPER, new SparkUpperFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_SUBSTRING, new SparkSubstringFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_DATE_ADD, new SparkDateAddFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_DATE_SUB, new SparkDateSubFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_TO_TIMESTAMP, new
SparkToTimestampFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_TO_DATE, new SparkToDateFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_UNIX_TIMESTAMP, new
SparkUnixTimestampFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_FROM_UNIXTIME, new
SparkFromUnixTimeFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_HOUR, new SparkHourFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_YEAR, new SparkYearFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_MONTH, new SparkMonthFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_DAY, new SparkDayFunction(){});
+ SPARK_FUNCTION_MAP.put(SPARK_DATE_FORMAT, new SparkDateFormatFunction(){});
+ }
+
+ /**
+ * Custom interface to support Spark functions
+ */
+ interface SparkFunction extends Serializable {
+
+ String getFunctionName();
+
+ Set<String> getValidOptions();
+
+ Column apply(List<Column> columns, Map<String, String> options);
+
+ default void validateOptions(Map<String, String> options) {
+ Set<String> validOptions = getValidOptions();
+ Set<String> invalidOptions = new HashSet<>(options.keySet());
+ invalidOptions.removeAll(validOptions);
+ ValidationUtils.checkArgument(invalidOptions.isEmpty(),
String.format("Input options %s are not valid for spark function %s",
invalidOptions, this));
+ }
+
+ static SparkFunction getSparkFunction(String functionName) {
+ return SPARK_FUNCTION_MAP.get(functionName);
+ }
+ }
+
+ interface SparkIdentityFunction extends SparkFunction {
+
+ default String getFunctionName() {
+ return IDENTITY_FUNCTION;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("IDENTITY requires 1 column");
+ }
+ return columns.get(0);
+ }
+ }
+
+ interface SparkSplitFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_SPLIT;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, PATTERN_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("SPLIT requires 1 column");
+ }
+ return functions.split(columns.get(0), options.get(PATTERN_OPTION));
+ }
+ }
+
+ interface SparkRegexExtractFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_REGEXP_EXTRACT;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, PATTERN_OPTION,
REGEX_GROUP_INDEX_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("REGEXP_EXTRACT requires 1 column");
+ }
+ return functions.regexp_extract(columns.get(0),
options.get(PATTERN_OPTION),
Integer.parseInt(options.get(REGEX_GROUP_INDEX_OPTION)));
+ }
+ }
+
+ interface SparkRegexReplaceFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_REGEXP_REPLACE;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, PATTERN_OPTION,
REPLACEMENT_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("REGEXP_REPLACE requires 1 column");
+ }
+ return functions.regexp_replace(columns.get(0),
options.get(PATTERN_OPTION), options.get(REPLACEMENT_OPTION));
+ }
+ }
+
+ interface SparkLengthFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_LENGTH;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("LENGTH requires 1 column");
+ }
+ return functions.length(columns.get(0));
+ }
+ }
+
+ interface SparkRTrimFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_RTRIM;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION,
TRIM_STRING_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("RTRIM requires 1 column");
+ }
+ if (options.containsKey(TRIM_STRING_OPTION)) {
+ return functions.rtrim(columns.get(0),
options.get(TRIM_STRING_OPTION));
+ } else {
+ return functions.rtrim(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkLTrimFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_LTRIM;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION,
TRIM_STRING_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("LTRIM requires 1 column");
+ }
+ if (options.containsKey(TRIM_STRING_OPTION)) {
+ return functions.ltrim(columns.get(0),
options.get(TRIM_STRING_OPTION));
+ } else {
+ return functions.ltrim(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkTrimFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_TRIM;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION,
TRIM_STRING_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("TRIM requires 1 column");
+ }
+ if (options.containsKey(TRIM_STRING_OPTION)) {
+ return functions.trim(columns.get(0), options.get(TRIM_STRING_OPTION));
+ } else {
+ return functions.trim(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkLowerFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_LOWER;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("LOWER requires 1 column");
+ }
+ return functions.lower(columns.get(0));
+ }
+ }
+
+ interface SparkUpperFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_UPPER;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("UPPER requires 1 column");
+ }
+ return functions.upper(columns.get(0));
+ }
+ }
+
+ interface SparkSubstringFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_SUBSTRING;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, POSITION_OPTION,
LENGTH_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("SUBSTRING requires 1 column");
+ }
+ return functions.substring(columns.get(0),
Integer.parseInt(options.get(POSITION_OPTION)),
Integer.parseInt(options.get(LENGTH_OPTION)));
+ }
+ }
+
+ interface SparkDateAddFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_DATE_ADD;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, DAYS_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("DATE_ADD requires 1 column");
+ }
+ return functions.date_add(columns.get(0),
Integer.parseInt(options.get(DAYS_OPTION)));
+ }
+ }
+
+ interface SparkDateSubFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_DATE_SUB;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, DAYS_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("DATE_SUB requires 1 column");
+ }
+ return functions.date_sub(columns.get(0),
Integer.parseInt(options.get(DAYS_OPTION)));
+ }
+ }
+
+ interface SparkToTimestampFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_TO_TIMESTAMP;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, FORMAT_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("TO_TIMESTAMP requires 1 column");
+ }
+ if (options.containsKey(FORMAT_OPTION)) {
+ return functions.to_timestamp(columns.get(0),
options.get(FORMAT_OPTION));
+ } else {
+ return functions.to_timestamp(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkToDateFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_TO_DATE;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, FORMAT_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("TO_DATE requires 1 column");
+ }
+ if (options.containsKey(FORMAT_OPTION)) {
+ return functions.to_date(columns.get(0), options.get(FORMAT_OPTION));
+ } else {
+ return functions.to_date(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkUnixTimestampFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_UNIX_TIMESTAMP;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, FORMAT_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("UNIX_TIMESTAMP requires 1 column");
+ }
+ if (options.containsKey(FORMAT_OPTION)) {
+ return functions.unix_timestamp(columns.get(0),
options.get(FORMAT_OPTION));
+ } else {
+ return functions.unix_timestamp(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkFromUnixTimeFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_FROM_UNIXTIME;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, FORMAT_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("FROM_UNIXTIME requires 1 column");
+ }
+ if (options.containsKey(FORMAT_OPTION)) {
+ return functions.from_unixtime(columns.get(0),
options.get(FORMAT_OPTION));
+ } else {
+ return functions.from_unixtime(columns.get(0));
+ }
+ }
+ }
+
+ interface SparkHourFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_HOUR;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("HOUR requires 1 column");
+ }
+ return functions.hour(columns.get(0));
+ }
+ }
+
+ interface SparkYearFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_YEAR;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("YEAR requires 1 column");
+ }
+ return functions.year(columns.get(0));
+ }
+ }
+
+ interface SparkMonthFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_MONTH;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("MONTH requires 1 column");
+ }
+ return functions.month(columns.get(0));
+ }
+ }
+
+ interface SparkDayFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_DAY;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return Collections.singleton(EXPRESSION_OPTION);
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("DAY requires 1 column");
+ }
+ return functions.dayofmonth(columns.get(0));
+ }
+ }
+
+ interface SparkDateFormatFunction extends SparkFunction {
+
+ @Override
+ default String getFunctionName() {
+ return SPARK_DATE_FORMAT;
+ }
+
+ @Override
+ default Set<String> getValidOptions() {
+ return new HashSet<>(Arrays.asList(EXPRESSION_OPTION, FORMAT_OPTION));
+ }
+
+ @Override
+ default Column apply(List<Column> columns, Map<String, String> options) {
+ if (columns.size() != 1) {
+ throw new IllegalArgumentException("DATE_FORMAT requires 1 column");
+ }
+ if (!options.containsKey(FORMAT_OPTION)) {
+ throw new IllegalArgumentException("DATE_FORMAT requires format
option");
+ }
+ return functions.date_format(columns.get(0), options.get(FORMAT_OPTION));
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
new file mode 100644
index 00000000000..ac03713c84c
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/expression/HoodieSparkExpressionIndex.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.expression;
+
+import org.apache.spark.sql.Column;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieSparkExpressionIndex implements
HoodieExpressionIndex<Column, Column>, Serializable {
+
+ private String indexName;
+ private String indexFunction;
+ private List<String> orderedSourceFields;
+ private Map<String, String> options;
+ private ExpressionIndexSparkFunctions.SparkFunction sparkFunction;
+
+ public HoodieSparkExpressionIndex() {
+ }
+
+ public HoodieSparkExpressionIndex(String indexName, String indexFunction,
List<String> orderedSourceFields, Map<String, String> options) {
+ this.indexName = indexName;
+ this.indexFunction = indexFunction;
+ this.orderedSourceFields = orderedSourceFields;
+ this.options = options;
+
+ // Check if the function from the expression exists in our map
+ this.sparkFunction =
ExpressionIndexSparkFunctions.SparkFunction.getSparkFunction(indexFunction);
+ if (this.sparkFunction == null) {
+ throw new IllegalArgumentException("Unsupported Spark function: " +
indexFunction);
+ }
+ }
+
+ @Override
+ public String getIndexName() {
+ return indexName;
+ }
+
+ @Override
+ public String getIndexFunction() {
+ return indexFunction;
+ }
+
+ @Override
+ public List<String> getOrderedSourceFields() {
+ return orderedSourceFields;
+ }
+
+ @Override
+ public Column apply(List<Column> orderedSourceValues) {
+ if (orderedSourceValues.size() != orderedSourceFields.size()) {
+ throw new IllegalArgumentException("Mismatch in number of source values
and fields in the expression");
+ }
+ sparkFunction.validateOptions(options);
+ return sparkFunction.apply(orderedSourceValues, options);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 5bf12c205ac..c3fa8269bb7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -19,7 +19,7 @@
package org.apache.hudi.metadata;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.HoodieSparkExpressionIndex;
+import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -41,7 +41,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.index.functional.HoodieExpressionIndex;
+import org.apache.hudi.index.expression.HoodieExpressionIndex;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkExpressionIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
similarity index 98%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkExpressionIndex.java
rename to
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
index cf8b7fa537e..2ed36e555a8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/functional/TestHoodieSparkExpressionIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/expression/TestHoodieSparkExpressionIndex.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.hudi.index.functional;
+package org.apache.hudi.index.expression;
-import org.apache.hudi.HoodieSparkExpressionIndex;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.sql.Column;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java
index e54182bc847..cbbbd730dba 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java
@@ -28,6 +28,14 @@ import java.util.StringJoiner;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.DAYS_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.FORMAT_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.REGEX_GROUP_INDEX_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.LENGTH_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.PATTERN_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.POSITION_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.REPLACEMENT_OPTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.TRIM_STRING_OPTION;
/**
* Class representing the metadata for a functional or secondary index in Hudi.
@@ -73,6 +81,42 @@ public class HoodieIndexDefinition implements Serializable {
return indexOptions;
}
+ public String getExpressionIndexFormatOption(String defaultValue) {
+ return indexOptions.getOrDefault(FORMAT_OPTION, defaultValue);
+ }
+
+ public String getExpressionIndexFormatOption() {
+ return indexOptions.get(FORMAT_OPTION);
+ }
+
+ public String getExpressionIndexDaysOption() {
+ return indexOptions.get(DAYS_OPTION);
+ }
+
+ public String getExpressionIndexPositionOption() {
+ return indexOptions.get(POSITION_OPTION);
+ }
+
+ public String getExpressionIndexLengthOption() {
+ return indexOptions.get(LENGTH_OPTION);
+ }
+
+ public String getExpressionIndexPatternOption() {
+ return indexOptions.get(PATTERN_OPTION);
+ }
+
+ public String getExpressionIndexReplacementOption() {
+ return indexOptions.get(REPLACEMENT_OPTION);
+ }
+
+ public String getExpressionIndexIndexOption() {
+ return indexOptions.get(REGEX_GROUP_INDEX_OPTION);
+ }
+
+ public String getExpressionIndexTrimStringOption() {
+ return indexOptions.get(TRIM_STRING_OPTION);
+ }
+
public String getIndexName() {
return indexName;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieExpressionIndex.java
b/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
similarity index 71%
rename from
hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieExpressionIndex.java
rename to
hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
index fda8841c35b..1da229e3836 100644
---
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieExpressionIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/index/expression/HoodieExpressionIndex.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.index.functional;
+package org.apache.hudi.index.expression;
import java.io.Serializable;
import java.util.List;
@@ -34,31 +34,16 @@ public interface HoodieExpressionIndex<S, T> extends
Serializable {
String HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH =
"_hoodie_expression_index_relative_file_path";
String HOODIE_EXPRESSION_INDEX_PARTITION =
"_hoodie_expression_index_partition";
String HOODIE_EXPRESSION_INDEX_FILE_SIZE =
"_hoodie_expression_index_file_size";
- String SPARK_DATE_FORMAT = "date_format";
- String SPARK_DAY = "day";
- String SPARK_MONTH = "month";
- String SPARK_YEAR = "year";
- String SPARK_HOUR = "hour";
- String SPARK_FROM_UNIXTIME = "from_unixtime";
- String SPARK_UNIX_TIMESTAMP = "unix_timestamp";
- String SPARK_TO_DATE = "to_date";
- String SPARK_TO_TIMESTAMP = "to_timestamp";
- String SPARK_DATE_ADD = "date_add";
- String SPARK_DATE_SUB = "date_sub";
- String SPARK_CONCAT = "concat";
- String SPARK_SUBSTRING = "substring";
- String SPARK_UPPER = "upper";
- String SPARK_LOWER = "lower";
- String SPARK_TRIM = "trim";
- String SPARK_LTRIM = "ltrim";
- String SPARK_RTRIM = "rtrim";
- String SPARK_LENGTH = "length";
- String SPARK_REGEXP_REPLACE = "regexp_replace";
- String SPARK_REGEXP_EXTRACT = "regexp_extract";
- String SPARK_SPLIT = "split";
- String IDENTITY_FUNCTION = "identity";
String EXPRESSION_OPTION = "expr";
+ String TRIM_STRING_OPTION = "trimString";
+ String REGEX_GROUP_INDEX_OPTION = "idx";
+ String REPLACEMENT_OPTION = "replacement";
+ String PATTERN_OPTION = "pattern";
+ String LENGTH_OPTION = "len";
+ String POSITION_OPTION = "pos";
+ String DAYS_OPTION = "days";
+ String FORMAT_OPTION = "format";
/**
* Get the name of the index.
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index 37cd96cc6ff..f411f8e4977 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -45,7 +45,6 @@ import java.util.Properties;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static
org.apache.hudi.index.functional.HoodieExpressionIndex.IDENTITY_FUNCTION;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -263,7 +262,7 @@ public class TestHoodieTableMetaClient extends
HoodieCommonTestHarness {
Map<String, Map<String, String>> columnsMap = new HashMap<>();
columnsMap.put("c1", Collections.emptyMap());
String indexName =
MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath() + "idx";
- HoodieIndexDefinition indexDefinition = new
HoodieIndexDefinition(indexName, "column_stats", IDENTITY_FUNCTION,
+ HoodieIndexDefinition indexDefinition = new
HoodieIndexDefinition(indexName, "column_stats", "identity",
new ArrayList<>(columnsMap.keySet()), Collections.emptyMap());
metaClient.buildIndexDefinition(indexDefinition);
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(indexName));
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
index 9c10527906c..fd6f0ddd740 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java
@@ -56,8 +56,8 @@ import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
-import static
org.apache.hudi.index.functional.HoodieExpressionIndex.EXPRESSION_OPTION;
-import static
org.apache.hudi.index.functional.HoodieExpressionIndex.IDENTITY_FUNCTION;
+import static
org.apache.hudi.index.expression.ExpressionIndexSparkFunctions.IDENTITY_FUNCTION;
+import static
org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_EXPRESSION_INDEX_PREFIX;
@@ -80,10 +80,11 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
}
@Override
- public void create(HoodieTableMetaClient metaClient, String userIndexName,
String indexType, Map<String, Map<String, String>> columns, Map<String, String>
options) throws Exception {
+ public void create(HoodieTableMetaClient metaClient, String userIndexName,
String indexType, Map<String, Map<String, String>> columns, Map<String, String>
options,
+ Map<String, String> tableProperties) throws Exception {
if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ||
indexType.equals(PARTITION_NAME_BLOOM_FILTERS)
|| indexType.equals(PARTITION_NAME_COLUMN_STATS)) {
- createExpressionOrSecondaryIndex(metaClient, userIndexName, indexType,
columns, options);
+ createExpressionOrSecondaryIndex(metaClient, userIndexName, indexType,
columns, options, tableProperties);
} else {
createRecordIndex(metaClient, userIndexName, indexType);
}
@@ -117,7 +118,7 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
}
private void createExpressionOrSecondaryIndex(HoodieTableMetaClient
metaClient, String userIndexName, String indexType,
- Map<String, Map<String,
String>> columns, Map<String, String> options) throws Exception {
+ Map<String, Map<String,
String>> columns, Map<String, String> options, Map<String, String>
tableProperties) throws Exception {
String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
: PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
@@ -126,7 +127,7 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
}
checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
- if (!isEligibleForIndexing(metaClient, indexType, options, columns)) {
+ if (!isEligibleForIndexing(metaClient, indexType, tableProperties,
columns)) {
throw new HoodieMetadataIndexException("Not eligible for indexing: " +
indexType + ", indexName: " + userIndexName);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 154498b57ec..4067ee6dc90 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -23,23 +23,21 @@ import
org.apache.hudi.ColumnStatsIndexSupport.{composeColumnStatStructType, des
import org.apache.hudi.ExpressionIndexSupport._
import org.apache.hudi.HoodieCatalystUtils.{withPersistedData,
withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.HoodieSparkExpressionIndex.SPARK_FUNCTION_MAP
-import org.apache.hudi.RecordLevelIndexSupport.{fetchQueryWithAttribute,
filterQueryWithRecordKey}
+import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition,
HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
+import org.apache.hudi.common.util.{StringUtils, collection}
import org.apache.hudi.data.HoodieJavaRDD
-import
org.apache.hudi.index.functional.HoodieExpressionIndex.SPARK_FROM_UNIXTIME
import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression,
FromUnixTime, In, Literal, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{DateAdd, DateFormatClass,
DateSub, EqualTo, Expression, FromUnixTime, In, Literal, ParseToDate,
ParseToTimestamp, RegExpExtract, RegExpReplace, StringSplit, StringTrim,
StringTrimLeft, StringTrimRight, Substring, UnaryExpression, UnixTimestamp}
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
@@ -54,10 +52,11 @@ import scala.collection.parallel.mutable.ParHashMap
class ExpressionIndexSupport(spark: SparkSession,
tableSchema: StructType,
metadataConfig: HoodieMetadataConfig,
- metaClient: HoodieTableMetaClient)
+ metaClient: HoodieTableMetaClient,
+ allowCaching: Boolean = false)
extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
- @transient private lazy val cachedColumnStatsIndexViews:
ParHashMap[Seq[String], DataFrame] = ParHashMap()
+ @transient private lazy val cachedExpressionIndexViews:
ParHashMap[Seq[String], DataFrame] = ParHashMap()
// NOTE: Since [[metadataConfig]] is transient this has to be eagerly
persisted, before this will be passed on to the executor
@@ -102,8 +101,8 @@ class ExpressionIndexSupport(spark: SparkSession,
def loadTransposed[T](targetColumns: Seq[String],
shouldReadInMemory: Boolean,
colStatRecords: HoodieData[HoodieMetadataColumnStats],
- expressionIndexQuery: Expression) (block: DataFrame =>
T): T = {
- cachedColumnStatsIndexViews.get(targetColumns) match {
+ expressionIndexQuery: Expression)(block: DataFrame =>
T): T = {
+ cachedExpressionIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
case None =>
@@ -121,9 +120,8 @@ class ExpressionIndexSupport(spark: SparkSession,
spark.createDataFrame(rdd, indexSchema)
}
- val allowCaching: Boolean = false
if (allowCaching) {
- cachedColumnStatsIndexViews.put(targetColumns, df)
+ cachedExpressionIndexViews.put(targetColumns, df)
// NOTE: Instead of collecting the rows from the index and hold
them in memory, we instead rely
// on Spark as (potentially distributed) cache managing data
lifecycle, while we simply keep
// the referenced to persisted [[DataFrame]] instance
@@ -323,8 +321,8 @@ class ExpressionIndexSupport(spark: SparkSession,
}
override def invalidateCaches(): Unit = {
- cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
- cachedColumnStatsIndexViews.clear()
+ cachedExpressionIndexViews.foreach { case (_, df) => df.unpersist() }
+ cachedExpressionIndexViews.clear()
}
/**
@@ -334,16 +332,10 @@ class ExpressionIndexSupport(spark: SparkSession,
metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent &&
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
}
- private def filterQueriesWithFunctionalFilterKey(queryFilters:
Seq[Expression], sourceFieldOpt: Option[String]): List[Tuple2[Expression,
List[String]]] = {
+ private def filterQueriesWithFunctionalFilterKey(queryFilters:
Seq[Expression], sourceFieldOpt: Option[String],
+ attributeFetcher:
Function1[Expression, Expression]): List[Tuple2[Expression, List[String]]] = {
var expressionIndexQueries: List[Tuple2[Expression, List[String]]] =
List.empty
for (query <- queryFilters) {
- val attributeFetcher = (expr: Expression) => {
- expr match {
- case expression: UnaryExpression => expression.child
- case expression: FromUnixTime => expression.sec
- case other => other
- }
- }
filterQueryWithRecordKey(query, sourceFieldOpt,
attributeFetcher).foreach({
case (exp: Expression, literals: List[String]) =>
expressionIndexQueries = expressionIndexQueries :+ Tuple2.apply(exp,
literals)
@@ -389,37 +381,59 @@ class ExpressionIndexSupport(spark: SparkSession,
* Extracts mappings from function names to column names from a sequence of
expressions.
*
* This method iterates over a given sequence of Spark SQL expressions and
identifies expressions
- * that contain function calls corresponding to keys in the
`SPARK_FUNCTION_MAP`. It supports only
+ * that contain function calls corresponding to keys in the
ExpressionIndexFunction. It supports only
* expressions that are simple binary expressions involving a single column.
If an expression contains
* one of the functions and operates on a single column, this method maps
the function name to the
* column name.
*/
private def extractQueryAndLiterals(queryFilters: Seq[Expression],
indexDefinition: HoodieIndexDefinition): Option[(Expression, List[String])] = {
- val expressionIndexQueries =
filterQueriesWithFunctionalFilterKey(queryFilters,
Option.apply(indexDefinition.getSourceFields.get(0)))
+ val attributeFetcher = (expr: Expression) => {
+ expr match {
+ case expression: UnaryExpression => expression.child
+ case expression: DateFormatClass if
expression.right.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexFormatOption())
=>
+ expression.left
+ case expression: FromUnixTime
+ if
expression.format.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexFormatOption(TimestampFormatter.defaultPattern()))
=>
+ expression.sec
+ case expression: UnixTimestamp if
expression.right.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexFormatOption(TimestampFormatter.defaultPattern()))
=>
+ expression.timeExp
+ case expression: ParseToDate if (expression.format.isEmpty &&
StringUtils.isNullOrEmpty(indexDefinition.getExpressionIndexFormatOption()))
+ || (expression.format.isDefined &&
expression.format.get.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexFormatOption()))
=>
+ expression.left
+ case expression: ParseToTimestamp if (expression.format.isEmpty &&
StringUtils.isNullOrEmpty(indexDefinition.getExpressionIndexFormatOption()))
+ || (expression.format.isDefined &&
expression.format.get.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexFormatOption()))
=>
+ expression.left
+ case expression: DateAdd if expression.days.isInstanceOf[Literal] &&
expression.days.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexDaysOption)
=>
+ expression.startDate
+ case expression: DateSub if expression.days.isInstanceOf[Literal] &&
expression.days.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexDaysOption)
=>
+ expression.startDate
+ case expression: Substring if
expression.pos.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexPositionOption)
+ &&
expression.len.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexLengthOption)=>
+ expression.str
+ case expression: StringTrim if (expression.trimStr.isEmpty &&
StringUtils.isNullOrEmpty(indexDefinition.getExpressionIndexTrimStringOption))
+ || (expression.trimStr.isDefined &&
expression.trimStr.get.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexTrimStringOption))
=> expression.srcStr
+ case expression: StringTrimLeft if (expression.trimStr.isEmpty &&
StringUtils.isNullOrEmpty(indexDefinition.getExpressionIndexTrimStringOption))
+ || (expression.trimStr.isDefined &&
expression.trimStr.get.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexTrimStringOption))
=> expression.srcStr
+ case expression: StringTrimRight if (expression.trimStr.isEmpty &&
StringUtils.isNullOrEmpty(indexDefinition.getExpressionIndexTrimStringOption))
+ || (expression.trimStr.isDefined &&
expression.trimStr.get.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexTrimStringOption))
=> expression.srcStr
+ case expression: RegExpReplace if
expression.pos.asInstanceOf[Literal].value.toString.equals("1")
+ &&
expression.regexp.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexPatternOption)
+ &&
expression.rep.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexReplacementOption)
=>
+ expression.subject
+ case expression: RegExpExtract if
expression.regexp.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexPatternOption)
+ &&
expression.idx.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexIndexOption)
=>
+ expression.subject
+ case expression: StringSplit if
expression.limit.asInstanceOf[Literal].value.toString.equals("-1")
+ &&
expression.regex.asInstanceOf[Literal].value.toString.equals(indexDefinition.getExpressionIndexPatternOption)
=>
+ expression.str
+ case other => other
+ }
+ }
+ val expressionIndexQueries =
filterQueriesWithFunctionalFilterKey(queryFilters,
Option.apply(indexDefinition.getSourceFields.get(0)), attributeFetcher)
var queryAndLiteralsOpt: Option[(Expression, List[String])] = Option.empty
expressionIndexQueries.foreach { tuple =>
val (expr, literals) = (tuple._1, tuple._2)
- val functionNameOption =
SPARK_FUNCTION_MAP.asScala.keys.find(expr.toString.contains)
- val functionName = functionNameOption.getOrElse("identity")
- if (indexDefinition.getIndexFunction.equals(functionName)) {
- val attributeFetcher = (expr: Expression) => {
- expr match {
- case expression: UnaryExpression => expression.child
- case expression: FromUnixTime => expression.sec
- case other => other
- }
- }
- if (functionName.equals(SPARK_FROM_UNIXTIME)) {
- val configuredFormat =
indexDefinition.getIndexOptions.getOrDefault("format",
TimestampFormatter.defaultPattern)
- if (expr.toString().contains(configuredFormat)) {
- val pruningExpr = fetchQueryWithAttribute(expr,
Option.apply(indexDefinition.getSourceFields.get(0)),
RecordLevelIndexSupport.getSimpleLiteralGenerator(), attributeFetcher)._1.get._1
- queryAndLiteralsOpt = Option.apply(Tuple2.apply(pruningExpr,
literals))
- }
- } else {
- val pruningExpr = fetchQueryWithAttribute(expr,
Option.apply(indexDefinition.getSourceFields.get(0)),
RecordLevelIndexSupport.getSimpleLiteralGenerator(), attributeFetcher)._1.get._1
- queryAndLiteralsOpt = Option.apply(Tuple2.apply(pruningExpr,
literals))
- }
- }
+ queryAndLiteralsOpt = Option.apply(Tuple2.apply(expr, literals))
}
queryAndLiteralsOpt
}
@@ -534,7 +548,7 @@ class ExpressionIndexSupport(spark: SparkSession,
}
object ExpressionIndexSupport {
- val INDEX_NAME = "FUNCTIONAL"
+ val INDEX_NAME = "EXPRESSION"
/**
* Target Column Stats Index columns which internally are mapped onto fields
of the corresponding
* Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted
w/in Metadata Table
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 1cb20026876..ae4d6afef8e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -31,7 +31,7 @@ import org.apache.hudi.keygen.KeyGenUtils
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
EqualTo, Expression, In, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
Cast, EqualTo, Expression, In, Literal}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import scala.collection.JavaConverters._
@@ -165,7 +165,7 @@ object RecordLevelIndexSupport {
val literal = attributeLiteralTuple._2
if (attribute != null && attribute.name != null &&
attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
val recordKeyLiteral = literalGenerator.apply(attribute, literal)
- (Option.apply(equalToQuery, List.apply(recordKeyLiteral)), true)
+ (Option.apply(EqualTo(attribute, literal),
List.apply(recordKeyLiteral)), true)
} else {
(Option.empty, true)
}
@@ -196,7 +196,7 @@ object RecordLevelIndexSupport {
case _ => validINQuery = false
}
if (validINQuery) {
- (Option.apply(inQuery, literals), true)
+ (Option.apply(In(attributeOpt.get, inQuery.list), literals), true)
} else {
(Option.empty, true)
}
@@ -351,6 +351,14 @@ object RecordLevelIndexSupport {
case literal: Literal => expression2 match {
case attr: AttributeReference =>
Option.apply(attr, literal)
+ case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
+ Option.apply(cast.child.asInstanceOf[AttributeReference], literal)
+ case _ =>
+ Option.empty
+ }
+ case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
expression2 match {
+ case literal: Literal =>
+ Option.apply(cast.child.asInstanceOf[AttributeReference], literal)
case _ =>
Option.empty
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index 7d030709a02..aa7f45dac26 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient}
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
import org.apache.hudi.exception.HoodieIndexException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import
org.apache.hudi.index.functional.HoodieExpressionIndex.{EXPRESSION_OPTION,
IDENTITY_FUNCTION}
+import org.apache.hudi.index.expression.ExpressionIndexSparkFunctions
+import org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION
import org.apache.hudi.metadata.{HoodieTableMetadataUtil,
MetadataPartitionType}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -52,23 +53,20 @@ case class CreateIndexCommand(table: CatalogTable,
val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String,
String]] =
new util.LinkedHashMap[String, java.util.Map[String, String]]()
columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))
- val extraOpts = options ++ table.properties
if (indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
||
indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
// validate that only overwrite with latest payloads can enabled SI
if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) &&
- extraOpts.asJava.getOrDefault(EXPRESSION_OPTION,
IDENTITY_FUNCTION).equals(IDENTITY_FUNCTION)) {
+ options.asJava.getOrDefault(EXPRESSION_OPTION,
ExpressionIndexSparkFunctions.IDENTITY_FUNCTION).equals(ExpressionIndexSparkFunctions.IDENTITY_FUNCTION))
{
throw new HoodieIndexException("Column stats index without expression
on any column can be created using datasource configs. " +
"Please refer https://hudi.apache.org/docs/metadata for more info")
}
- new HoodieSparkIndexClient(sparkSession).create(
- metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
+ new HoodieSparkIndexClient(sparkSession).create(metaClient, indexName,
indexType, columnsMap, options.asJava, table.properties.asJava)
} else if
(indexName.equals(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) {
ValidationUtils.checkArgument(CreateIndexCommand.matchesRecordKeys(columnsMap.keySet().asScala.toSet,
metaClient.getTableConfig),
"Input columns should match configured record key columns: " +
metaClient.getTableConfig.getRecordKeyFieldProp)
- new HoodieSparkIndexClient(sparkSession).create(
- metaClient, indexName,
HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, columnsMap,
extraOpts.asJava)
+ new HoodieSparkIndexClient(sparkSession).create(metaClient, indexName,
HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, columnsMap,
options.asJava, table.properties.asJava)
} else if (StringUtils.isNullOrEmpty(indexType)) {
val columnNames = columnsMap.keySet().asScala.toSet
val derivedIndexType: String = if
(CreateIndexCommand.matchesRecordKeys(columnNames, metaClient.getTableConfig)) {
@@ -82,8 +80,7 @@ case class CreateIndexCommand(table: CatalogTable,
throw new HoodieIndexException("Secondary Index can only be enabled
on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode
set to OVERWRITE_WITH_LATEST")
}
}
- new HoodieSparkIndexClient(sparkSession).create(
- metaClient, indexName, derivedIndexType, columnsMap, extraOpts.asJava)
+ new HoodieSparkIndexClient(sparkSession).create(metaClient, indexName,
derivedIndexType, columnsMap, options.asJava, table.properties.asJava)
} else {
throw new HoodieIndexException(String.format("%s is not supported",
indexType))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
index efd461fe249..e6329809628 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
@@ -35,8 +35,7 @@ import org.apache.hudi.config.{HoodieCleanConfig,
HoodieCompactionConfig, Hoodie
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
import org.apache.hudi.index.HoodieIndex
-import org.apache.hudi.index.functional.HoodieExpressionIndex
-import org.apache.hudi.index.functional.HoodieExpressionIndex.EXPRESSION_OPTION
+import org.apache.hudi.index.expression.{HoodieExpressionIndex,
HoodieSparkExpressionIndex}
import org.apache.hudi.metadata.{HoodieMetadataFileSystemView,
MetadataPartitionType}
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH,
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
@@ -190,7 +189,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
assertResult("idx_datestr")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
assertResult("column_stats")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
- assertResult(Map(EXPRESSION_OPTION -> "from_unixtime", "format" ->
"yyyy-MM-dd"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
+ assertResult(Map(HoodieExpressionIndex.EXPRESSION_OPTION ->
"from_unixtime", "format" ->
"yyyy-MM-dd"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
logicalPlan = sqlParser.parsePlan(s"create index idx_name on
$tableName using bloom_filters(name) options(expr='lower')")
resolvedLogicalPlan = analyzer.execute(logicalPlan)
@@ -198,7 +197,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
assertResult("bloom_filters")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
- assertResult(Map(EXPRESSION_OPTION ->
"lower"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
+ assertResult(Map(HoodieExpressionIndex.EXPRESSION_OPTION ->
"lower"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
}
}
}
@@ -745,15 +744,17 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
}
/**
- * Test expression index with data skipping for unary expression and binary
expression.
+ * Test expression index with invalid options
*/
@Test
- def testColumnStatsPruningWithUnaryBinaryExpr(): Unit = {
+ def testInvalidOptions(): Unit = {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName + s"_stats_pruning_binary_$tableType"
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
spark.sql(
s"""
CREATE TABLE $tableName (
@@ -762,6 +763,8 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
| rider STRING,
| driver STRING,
| fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
| city STRING,
| state STRING
|) USING HUDI
@@ -783,28 +786,223 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase
{
spark.sql(
s"""
- |insert into $tableName(ts, id, rider, driver, fare, city, state)
VALUES
- |
(1695414527,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
- |
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
- |
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
- |
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
|""".stripMargin)
+
+ // With invalid options
+ checkNestedExceptionContains(s"create index idx_datestr on $tableName
using column_stats(ts) options(expr='from_unixtime', invalidOp='random')")(
+ "Input options [invalidOp] are not valid for spark function"
+ )
+ }
+ }
+ }
+
+ /**
+ * Test expression index with data skipping for date and timestamp based
expressions.
+ */
+ @Test
+ def testColumnStatsPruningWithDateTimestampExpressions(): Unit = {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName +
s"_stats_pruning_date_expr_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
spark.sql(
s"""
- |insert into $tableName(ts, id, rider, driver, fare, city, state)
VALUES
- |
(1695414520,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
- |
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
|""".stripMargin)
- // With unary expression
- spark.sql(s"create index idx_rider on $tableName using
column_stats(rider) options(expr='lower')")
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30
01:30:40', '2020-11-30', 'san_francisco','california'),
+ | (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30
01:30:40', '2021-11-30', 'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30
01:30:40', '2022-11-30', 'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30
01:30:40', '2023-11-30', 'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
+
+ val tableSchema: StructType =
+ StructType(
+ Seq(
+ StructField("ts", LongType),
+ StructField("id", StringType),
+ StructField("rider", StringType),
+ StructField("driver", StringType),
+ StructField("fare", DoubleType),
+ StructField("dateDefault", StringType),
+ StructField("date", StringType),
+ StructField("city", StringType),
+ StructField("state", StringType)
+ )
+ )
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
// With binary expression
spark.sql(s"create index idx_datestr on $tableName using
column_stats(ts) options(expr='from_unixtime', format='yyyy-MM-dd')")
// validate index created successfully
var metaClient = createMetaClient(spark, basePath)
- assertTrue(metaClient.getIndexMetadata.isPresent)
- val expressionIndexMetadata = metaClient.getIndexMetadata.get()
- assertEquals("expr_index_idx_datestr",
expressionIndexMetadata.getIndexDefinitions.get("expr_index_idx_datestr").getIndexName)
+ val fromUnixTime = resolveExpr(spark,
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get,
tableSchema)
+ var literal = Literal.create("2023-11-07")
+ var dataFilter = EqualTo(fromUnixTime, literal)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_datestr on $tableName")
+
+ spark.sql(s"create index idx_unix_default on $tableName using
column_stats(dateDefault) options(expr='unix_timestamp')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val unixTimestampDefault = resolveExpr(spark,
unapply(functions.unix_timestamp(functions.col("dateDefault"))).get,
tableSchema)
+ literal = Literal.create(1606699840L)
+ dataFilter = EqualTo(unixTimestampDefault, literal)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_unix_default on $tableName")
+
+ spark.sql(s"create index idx_unix on $tableName using
column_stats(date) options(expr='unix_timestamp', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val unixTimestamp = resolveExpr(spark,
unapply(functions.unix_timestamp(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ literal = Literal.create(1606694400L)
+ dataFilter = EqualTo(unixTimestamp, literal)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_unix on $tableName")
+
+ spark.sql(s"create index idx_to_date on $tableName using
column_stats(date) options(expr='to_date', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val toDate = resolveExpr(spark,
unapply(functions.to_date(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ dataFilter = EqualTo(toDate, lit(18596).expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_to_date on $tableName")
+
+ spark.sql(s"create index idx_to_date_default on $tableName using
column_stats(date) options(expr='to_date')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val toDateDefault = resolveExpr(spark,
unapply(functions.to_date(functions.col("date"))).get, tableSchema)
+ dataFilter = EqualTo(toDateDefault, lit(18596).expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_to_date_default on $tableName")
+
+ spark.sql(s"create index idx_date_format on $tableName using
column_stats(date) options(expr='date_format', format='yyyy')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val dateFormatDefault = resolveExpr(spark,
unapply(functions.date_format(functions.col("date"), "yyyy")).get, tableSchema)
+ dataFilter = EqualTo(dateFormatDefault, lit("2020").expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_date_format on $tableName")
+
+ spark.sql(s"create index idx_to_timestamp_default on $tableName using
column_stats(date) options(expr='to_timestamp')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val toTimestampDefault = resolveExpr(spark,
unapply(functions.to_timestamp(functions.col("date"))).get, tableSchema)
+ dataFilter = EqualTo(toTimestampDefault, lit(1732924800000000L).expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_to_timestamp_default on $tableName")
+
+ spark.sql(s"create index idx_to_timestamp on $tableName using
column_stats(date) options(expr='to_timestamp', format='yyyy-MM-dd')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val toTimestamp = resolveExpr(spark,
unapply(functions.to_timestamp(functions.col("date"), "yyyy-MM-dd")).get,
tableSchema)
+ dataFilter = EqualTo(toTimestamp, lit(1732924800000000L).expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_to_timestamp on $tableName")
+
+ spark.sql(s"create index idx_date_add on $tableName using
column_stats(date) options(expr='date_add', days='10')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val dateAdd = resolveExpr(spark,
unapply(functions.date_add(functions.col("date"), 10)).get, tableSchema)
+ dataFilter = EqualTo(dateAdd, lit(18606).expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_date_add on $tableName")
+
+ spark.sql(s"create index idx_date_sub on $tableName using
column_stats(date) options(expr='date_sub', days='10')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val dateSub = resolveExpr(spark,
unapply(functions.date_sub(functions.col("date"), 10)).get, tableSchema)
+ dataFilter = EqualTo(dateSub, lit(18586).expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_date_sub on $tableName")
+ }
+ }
+ }
+
+ /**
+ * Test expression index with data skipping for string expressions.
+ */
+ @Test
+ def testColumnStatsPruningWithStringExpressions(): Unit = {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName +
s"_stats_pruning_string_expr_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30
01:30:40', '2020-11-30', 'san_francisco','california'),
+ | (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30
01:30:40', '2021-11-30', 'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30
01:30:40', '2022-11-30', 'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30
01:30:40', '2023-11-30', 'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
val tableSchema: StructType =
StructType(
@@ -814,23 +1012,67 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase
{
StructField("rider", StringType),
StructField("driver", StringType),
StructField("fare", DoubleType),
+ StructField("dateDefault", StringType),
+ StructField("date", StringType),
StructField("city", StringType),
StructField("state", StringType)
)
)
val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
- metaClient = createMetaClient(spark, basePath)
+ // With unary expression
+ spark.sql(s"create index idx_lower on $tableName using
column_stats(rider) options(expr='lower')")
+ var metaClient = createMetaClient(spark, basePath)
// validate skipping with both types of expression
val lowerExpr = resolveExpr(spark,
unapply(functions.lower(functions.col("rider"))).get, tableSchema)
var literal = Literal.create("rider-c")
var dataFilter = EqualTo(lowerExpr, literal)
verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_lower on $tableName")
- val fromUnixTime = resolveExpr(spark,
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get,
tableSchema)
- literal = Literal.create("2023-11-07")
- dataFilter = EqualTo(fromUnixTime, literal)
+ spark.sql(s"create index idx_substring on $tableName using
column_stats(driver) options(expr='substring', pos='8', len='1')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val substring = resolveExpr(spark,
unapply(functions.substring(functions.col("driver"), 8, 1)).get, tableSchema)
+ dataFilter = EqualTo(substring, lit("K").expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_substring on $tableName")
+
+ spark.sql(s"create index idx_trim on $tableName using
column_stats(driver) options(expr='trim', trimString='-K')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val trim = resolveExpr(spark,
unapply(functions.trim(functions.col("driver"), "-K")).get, tableSchema)
+ dataFilter = EqualTo(trim, lit("driver").expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_trim on $tableName")
+
+ spark.sql(s"create index idx_rtrim on $tableName using
column_stats(driver) options(expr='rtrim', trimString='-K')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val rtrim = resolveExpr(spark,
unapply(functions.rtrim(functions.col("driver"), "-K")).get, tableSchema)
+ dataFilter = EqualTo(rtrim, lit("driver").expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_rtrim on $tableName")
+
+ spark.sql(s"create index idx_ltrim on $tableName using
column_stats(driver) options(expr='ltrim', trimString='driver-')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val ltrim = resolveExpr(spark,
unapply(functions.ltrim(functions.col("driver"), "driver-")).get, tableSchema)
+ dataFilter = EqualTo(ltrim, lit("K").expr)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_ltrim on $tableName")
+
+ spark.sql(s"create index idx_regexp on $tableName using
column_stats(rider) options(expr='regexp_replace', pattern='rider',
replacement='passenger')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val regExpReplace = resolveExpr(spark,
unapply(functions.regexp_replace(functions.col("rider"), "rider",
"passenger")).get, tableSchema)
+ literal = Literal.create("passenger-F")
+ dataFilter = EqualTo(regExpReplace, literal)
+ verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_regexp on $tableName")
+
+ spark.sql(s"create index idx_regexp_extract on $tableName using
column_stats(driver) options(expr='regexp_extract', pattern='driver-(\\\\w+)',
idx='1')")
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val regExpExtract = resolveExpr(spark,
unapply(functions.regexp_extract(functions.col("driver"), "driver-(\\w+)",
1)).get, tableSchema)
+ literal = Literal.create("K")
+ dataFilter = EqualTo(regExpExtract, literal)
verifyFilePruning(opts, dataFilter, metaClient, isDataSkippingExpected
= true)
+ spark.sql(s"drop index idx_regexp_extract on $tableName")
}
}
}
@@ -888,7 +1130,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|""".stripMargin)
// create index using bloom filters on city column with upper()
function
- spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(expr='upper', numHashFunctions=1,
fpp=0.00000000001)")
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(expr='upper')")
// Pruning takes place only if query uses upper function on city
checkAnswer(s"select id, rider from $tableName where upper(city) in
('sunnyvale', 'sg')")()
@@ -907,7 +1149,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
// drop index and recreate without upper() function
spark.sql(s"drop index idx_bloom_$tableName on $tableName")
- spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city)")
// Pruning takes place only if query uses no function on city
checkAnswer(s"select id, rider from $tableName where city =
'sunnyvale'")(
Seq("trip2", "rider-C")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 48b1f907f8c..b30a2adb353 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.hudi.common
+import org.apache.hadoop.fs.Path
import org.apache.hudi.DefaultSparkRecordMerger
+import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
@@ -27,13 +29,10 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient,
getSparkConfForTest}
-import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
-
-import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageContains
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source
@@ -130,6 +129,26 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
assertResult(expects.map(row => Row(row: _*)).toArray)(array)
}
+ protected def checkNestedExceptionContains(sql: String)(errorMsg: String):
Unit = {
+ var hasException = false
+ try {
+ spark.sql(sql)
+ } catch {
+ case e: Throwable =>
+ var t = e
+ while (t != null) {
+ if (t.getMessage.trim.contains(errorMsg.trim)) {
+ hasException = true
+ }
+ t = t.getCause
+ }
+ if (!hasException) {
+ e.printStackTrace(System.err)
+ }
+ }
+ assertResult(true)(hasException)
+ }
+
protected def checkExceptions(sql: String)(errorMsgs: Seq[String]): Unit = {
var hasException = false
try {