This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ae5833ea34a [HUDI-8533] Bloom functional Index creation without
function fails (#12290)
ae5833ea34a is described below
commit ae5833ea34afc8849722509d185f4e94dd3fb424
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Nov 22 20:18:49 2024 +0530
[HUDI-8533] Bloom functional Index creation without function fails (#12290)
Allow creation of functional index without an input function.
Uses a default identity function in such a case.
---------
Co-authored-by: vinoth chandar <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../client/utils/SparkMetadataWriterUtils.java | 20 +--
.../SparkHoodieBackedTableMetadataWriter.java | 7 +-
.../hudi/common/table/HoodieTableMetaClient.java | 8 +-
.../index/functional/HoodieFunctionalIndex.java | 2 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 12 +-
.../metadata/FileSystemBackedTableMetadata.java | 4 +-
.../hudi/metadata/HoodieMetadataPayload.java | 14 +-
.../apache/hudi/metadata/HoodieTableMetadata.java | 31 +++-
.../hudi/metadata/MetadataPartitionType.java | 7 +
.../common/table/TestHoodieTableMetaClient.java | 29 ++++
.../org/apache/hudi/FunctionalIndexSupport.scala | 136 +++++++++++-----
.../org/apache/hudi/RecordLevelIndexSupport.scala | 10 +-
.../apache/hudi/TestSecondaryIndexSupport.scala | 1 -
.../spark/sql/hudi/command/IndexCommands.scala | 6 +-
.../functional/TestSecondaryIndexPruning.scala | 111 ++++++++++++-
.../hudi/command/index/TestFunctionalIndex.scala | 175 +++++++++++++++++++--
16 files changed, 485 insertions(+), 88 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 4a9c326b296..484ddb651eb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -82,7 +83,7 @@ public class SparkMetadataWriterUtils {
public static Column[] getFunctionalIndexColumns() {
return new Column[] {
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION),
- functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH),
+
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH),
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE)
};
}
@@ -90,7 +91,7 @@ public class SparkMetadataWriterUtils {
public static String[] getFunctionalIndexColumnNames() {
return new String[] {
HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
- HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
+ HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH,
HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE
};
}
@@ -129,13 +130,13 @@ public class SparkMetadataWriterUtils {
long valueCount = row.getLong(baseAggregatePosition + 3);
String partitionName = row.getString(0);
- String filePath = row.getString(1);
+ String relativeFilePath = row.getString(1);
long totalFileSize = row.getLong(2);
// Total uncompressed size is harder to get directly. This is just
an approximation to maintain the order.
long totalUncompressedSize = totalFileSize * 2;
HoodieColumnRangeMetadata<Comparable> rangeMetadata =
HoodieColumnRangeMetadata.create(
- filePath,
+ relativeFilePath,
columnToIndex,
minValue,
maxValue,
@@ -150,21 +151,22 @@ public class SparkMetadataWriterUtils {
}
public static HoodieData<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex,
-
HoodieWriteConfig metadataWriteConfig, String instantTime) {
+
HoodieWriteConfig metadataWriteConfig, String instantTime, String
indexName) {
// Group data using functional index metadata and then create bloom filter
on the group
Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex,
SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
- // row.get(0) refers to partition path value and row.get(1) refers to
file name.
- .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(0),
row.getString(1)), Encoders.kryo(Pair.class))
+ // row.get(1) refers to partition path value and row.get(2) refers to
file name.
+ .groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(1),
row.getString(2)), Encoders.kryo(Pair.class))
.flatMapGroups((FlatMapGroupsFunction<Pair, Row, HoodieRecord>)
((pair, iterator) -> {
String partition = pair.getLeft().toString();
- String fileName = pair.getRight().toString();
+ String relativeFilePath = pair.getRight().toString();
+ String fileName = FSUtils.getFileName(relativeFilePath, partition);
BloomFilter bloomFilter =
HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);
iterator.forEachRemaining(row -> {
byte[] key = row.getAs(columnToIndex).toString().getBytes();
bloomFilter.add(key);
});
ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
- HoodieRecord bloomFilterRecord =
createBloomFilterMetadataRecord(partition, fileName, instantTime,
metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false);
+ HoodieRecord bloomFilterRecord =
createBloomFilterMetadataRecord(partition, fileName, instantTime,
metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false, indexName);
return Collections.singletonList(bloomFilterRecord).iterator();
}), Encoders.kryo(HoodieRecord.class));
return HoodieJavaRDD.of(bloomFilterRecords.javaRDD());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 6eb3d51247b..2e1f6352e37 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -191,17 +191,18 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
String partition = entry.getKey();
Pair<String, Long> filePathSizePair = entry.getValue();
String filePath = filePathSizePair.getKey();
+ String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
long fileSize = filePathSizePair.getValue();
List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
- List<Row> rowsWithIndexMetadata =
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath,
partition, filePath, fileSize);
+ List<Row> rowsWithIndexMetadata =
SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath,
partition, relativeFilePath, fileSize);
return rowsWithIndexMetadata.iterator();
});
// Generate dataset with functional index metadata
StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
-
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
+
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
Dataset<Row> rowDataset =
sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(),
structType);
@@ -215,7 +216,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset,
functionalIndex, columnToIndex);
} else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime);
+ return
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset,
columnToIndex, metadataWriteConfig, instantTime,
indexDefinition.getIndexName());
} else {
throw new UnsupportedOperationException(indexDefinition.getIndexType() +
" is not yet supported");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 442d0d81be6..428cfe0777f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,6 +89,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.index.functional.HoodieFunctionalIndex.SPARK_IDENTITY;
import static org.apache.hudi.io.storage.HoodieIOFactory.getIOFactory;
/**
@@ -219,11 +219,13 @@ public class HoodieTableMetaClient implements
Serializable {
"Index metadata is already present");
String indexMetaPath = getIndexDefinitionPath();
List<String> columnNames = new ArrayList<>(columns.keySet());
- HoodieIndexDefinition indexDefinition = new
HoodieIndexDefinition(indexName, indexType, options.get("func"), columnNames,
options);
+ HoodieIndexDefinition indexDefinition = new
HoodieIndexDefinition(indexName, indexType, options.getOrDefault("func",
SPARK_IDENTITY), columnNames, options);
if (indexMetadataOpt.isPresent()) {
indexMetadataOpt.get().getIndexDefinitions().put(indexName,
indexDefinition);
} else {
- indexMetadataOpt = Option.of(new
HoodieIndexMetadata(Collections.singletonMap(indexName, indexDefinition)));
+ Map<String, HoodieIndexDefinition> indexDefinitionMap = new HashMap<>();
+ indexDefinitionMap.put(indexName, indexDefinition);
+ indexMetadataOpt = Option.of(new
HoodieIndexMetadata(indexDefinitionMap));
}
try {
FileIOUtils.createFileInPath(storage, new StoragePath(indexMetaPath),
Option.of(getUTF8Bytes(indexMetadataOpt.get().toJson())));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
index 4edb1e4c090..df3cc10f434 100644
---
a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java
@@ -31,7 +31,7 @@ import java.util.List;
*/
public interface HoodieFunctionalIndex<S, T> extends Serializable {
- String HOODIE_FUNCTIONAL_INDEX_FILE_PATH =
"_hoodie_functional_index_file_path";
+ String HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH =
"_hoodie_functional_index_relative_file_path";
String HOODIE_FUNCTIONAL_INDEX_PARTITION =
"_hoodie_functional_index_partition";
String HOODIE_FUNCTIONAL_INDEX_FILE_SIZE =
"_hoodie_functional_index_file_size";
String SPARK_DATE_FORMAT = "date_format";
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index cef4dd9352f..4aea9eeb356 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -166,14 +166,14 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
}
@Override
- public Option<BloomFilter> getBloomFilter(final String partitionName, final
String fileName) throws HoodieMetadataException {
- if
(!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS))
{
+ public Option<BloomFilter> getBloomFilter(final String partitionName, final
String fileName, final String metadataPartitionName) throws
HoodieMetadataException {
+ if
(!dataMetaClient.getTableConfig().getMetadataPartitions().contains(metadataPartitionName))
{
LOG.error("Metadata bloom filter index is disabled!");
return Option.empty();
}
final Pair<String, String> partitionFileName = Pair.of(partitionName,
fileName);
- Map<Pair<String, String>, BloomFilter> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName));
+ Map<Pair<String, String>, BloomFilter> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName),
metadataPartitionName);
if (bloomFilters.isEmpty()) {
LOG.error("Meta index: missing bloom filter for partition: {}, file:
{}", partitionName, fileName);
return Option.empty();
@@ -184,9 +184,9 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
}
@Override
- public Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ public Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList, final String
metadataPartitionName)
throws HoodieMetadataException {
- if
(!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS))
{
+ if
(!dataMetaClient.getTableConfig().getMetadataPartitions().contains(metadataPartitionName))
{
LOG.error("Metadata bloom filter index is disabled!");
return Collections.emptyMap();
}
@@ -206,7 +206,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
List<String> partitionIDFileIDStringsList = new
ArrayList<>(partitionIDFileIDStrings);
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
- getRecordsByKeys(partitionIDFileIDStringsList,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ getRecordsByKeys(partitionIDFileIDStringsList, metadataPartitionName);
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
timer.endTimer()));
metrics.ifPresent(m ->
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR,
partitionIDFileIDStringsList.size()));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index d0036e6bc5b..efafbad5a08 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -285,13 +285,13 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
// no-op
}
- public Option<BloomFilter> getBloomFilter(final String partitionName, final
String fileName)
+ public Option<BloomFilter> getBloomFilter(final String partitionName, final
String fileName, final String metadataPartitionName)
throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getBloomFilter
for " + fileName);
}
@Override
- public Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ public Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList, final String
metadataPartitionName)
throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation:
getBloomFilters!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index b3562b86d2b..b0bb9670825 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -288,6 +288,15 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return new HoodieAvroRecord<>(key, payload);
}
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final String bloomFilterType,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ return createBloomFilterMetadataRecord(partitionName, baseFileName,
timestamp, bloomFilterType, bloomFilter, isDeleted,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ }
+
/**
* Create bloom filter metadata record.
*
@@ -303,12 +312,13 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
final String timestamp,
final String bloomFilterType,
final ByteBuffer bloomFilter,
-
final boolean isDeleted) {
+
final boolean isDeleted,
+
String metadataPartitionName) {
checkArgument(!baseFileName.contains(StoragePath.SEPARATOR)
&& FSUtils.isBaseFile(new StoragePath(baseFileName)),
"Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
final String bloomFilterIndexKey = getBloomFilterRecordKey(partitionName,
baseFileName);
- HoodieKey key = new HoodieKey(bloomFilterIndexKey,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ HoodieKey key = new HoodieKey(bloomFilterIndexKey, metadataPartitionName);
HoodieMetadataBloomFilter metadataBloomFilter =
new HoodieMetadataBloomFilter(bloomFilterType, timestamp, bloomFilter,
isDeleted);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 816e34fce49..af11bfaf6e8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -202,7 +202,21 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
* @return BloomFilter if available, otherwise empty
* @throws HoodieMetadataException
*/
- Option<BloomFilter> getBloomFilter(final String partitionName, final String
fileName)
+ default Option<BloomFilter> getBloomFilter(final String partitionName, final
String fileName)
+ throws HoodieMetadataException {
+ return getBloomFilter(partitionName, fileName,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ }
+
+ /**
+ * Get the bloom filter for the FileID from the metadata table.
+ *
+ * @param partitionName - Partition name
+ * @param fileName - File name for which bloom filter needs to
be retrieved
+ * @param metadataPartitionName - Metadata partition name
+ * @return BloomFilter if available, otherwise empty
+ * @throws HoodieMetadataException
+ */
+ Option<BloomFilter> getBloomFilter(final String partitionName, final String
fileName, final String metadataPartitionName)
throws HoodieMetadataException;
/**
@@ -212,7 +226,20 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
* @return Map of partition file name pair to its bloom filter
* @throws HoodieMetadataException
*/
- Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ default Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ throws HoodieMetadataException {
+ return getBloomFilters(partitionNameFileNameList,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ }
+
+ /**
+ * Get bloom filters for files from the metadata table index.
+ *
+ * @param partitionNameFileNameList - List of partition and file name pair
for which bloom filters need to be retrieved
+ * @param metadataPartitionName - Metadata partition name
+ * @return Map of partition file name pair to its bloom filter
+ * @throws HoodieMetadataException
+ */
+ Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList, final String
metadataPartitionName)
throws HoodieMetadataException;
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 00d1cc0ff05..ab8e74a9e9e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -335,6 +335,13 @@ public enum MetadataPartitionType {
||
metadataPartitionPath.startsWith(FUNCTIONAL_INDEX.getPartitionPath());
}
+ public static String getGenericIndexNameWithoutPrefix(String indexName) {
+ String prefix = indexName.startsWith(SECONDARY_INDEX.getPartitionPath())
+ ? SECONDARY_INDEX.getPartitionPath()
+ : FUNCTIONAL_INDEX.getPartitionPath();
+ return indexName.substring(prefix.length());
+ }
+
// Partition path in metadata table.
private final String partitionPath;
// FileId prefix used for all file groups in this partition.
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index 171230a3dda..b6a541a9f0f 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -18,13 +18,16 @@
package org.apache.hudi.common.table;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hadoop.fs.Path;
@@ -33,6 +36,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
@@ -243,4 +249,27 @@ public class TestHoodieTableMetaClient extends
HoodieCommonTestHarness {
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
"/a/b/c");
assertEquals(randomDefinitionPath, metaClient.getIndexDefinitionPath());
}
+
+ @Test
+ public void testDeleteDefinition() throws IOException {
+ final String basePath = tempDir.toAbsolutePath() + Path.SEPARATOR + "t7";
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.newTableBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+ .setTableName("table")
+ .initTable(this.metaClient.getStorageConf(), basePath);
+ Map<String, Map<String, String>> columnsMap = new HashMap<>();
+ columnsMap.put("c1", Collections.emptyMap());
+ String indexName =
MetadataPartitionType.FUNCTIONAL_INDEX.getPartitionPath() + "idx";
+ metaClient.buildIndexDefinition(indexName, "column_stats",
+ columnsMap, Collections.emptyMap());
+
assertTrue(metaClient.getIndexMetadata().get().getIndexDefinitions().containsKey(indexName));
+ assertTrue(metaClient.getStorage().exists(new
StoragePath(metaClient.getIndexDefinitionPath())));
+ metaClient.deleteIndexDefinition(indexName);
+ assertTrue(metaClient.getIndexMetadata().isEmpty());
+ assertTrue(metaClient.getStorage().exists(new
StoragePath(metaClient.getIndexDefinitionPath())));
+ // Read from storage
+ HoodieIndexMetadata indexMetadata = HoodieIndexMetadata.fromJson(
+ new String(FileIOUtils.readDataFromPath(metaClient.getStorage(), new
StoragePath(metaClient.getIndexDefinitionPath())).get()));
+ assertTrue(indexMetadata.getIndexDefinitions().isEmpty());
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 9a56a865c7d..a6689ba5fce 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -22,19 +22,20 @@ package org.apache.hudi
import org.apache.hudi.FunctionalIndexSupport._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP
+import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
-import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
+import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition,
HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
import org.apache.hudi.data.HoodieJavaRDD
-import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -57,12 +58,21 @@ class FunctionalIndexSupport(spark: SparkSession,
prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
shouldPushDownFilesFilter: Boolean
): Option[Set[String]] = {
- lazy val functionalIndexPartitionOpt =
getFunctionalIndexPartition(queryFilters)
+ lazy val functionalIndexPartitionOpt =
getFunctionalIndexPartitionAndLiterals(queryFilters)
if (isIndexAvailable && queryFilters.nonEmpty &&
functionalIndexPartitionOpt.nonEmpty) {
- val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns,
inMemoryProjectionThreshold)
- val (prunedPartitions, prunedFileNames) =
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
- val indexDf =
loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, prunedPartitions,
readInMemory)
- Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+ val (indexPartition, literals) = functionalIndexPartitionOpt.get
+ val indexDefinition =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+ if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
{
+ val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
+ val (prunedPartitions, prunedFileNames) =
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
+ val indexDf = loadFunctionalIndexDataFrame(indexPartition,
prunedPartitions, readInMemory)
+ Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+ } else if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
{
+ val prunedPartitionAndFileNames =
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices,
includeLogFiles = true)
+ Option.apply(getCandidateFilesForKeys(indexPartition,
prunedPartitionAndFileNames, literals))
+ } else {
+ Option.empty
+ }
} else {
Option.empty
}
@@ -79,6 +89,23 @@ class FunctionalIndexSupport(spark: SparkSession,
metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent &&
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
}
+ def filterQueriesWithFunctionalFilterKey(queryFilters: Seq[Expression],
sourceFieldOpt: Option[String]): List[Tuple2[Expression, List[String]]] = {
+ var functionalIndexQueries: List[Tuple2[Expression, List[String]]] =
List.empty
+ for (query <- queryFilters) {
+ filterQueryWithRecordKey(query, sourceFieldOpt, (expr: Expression) => {
+ expr match {
+ case expression: UnaryExpression => expression.child
+ case other => other
+ }
+ }).foreach({
+ case (exp: Expression, literals: List[String]) =>
+ functionalIndexQueries = functionalIndexQueries :+ Tuple2.apply(exp,
literals)
+ })
+ }
+
+ functionalIndexQueries
+ }
+
/**
* Searches for an index partition based on the specified index function and
target column name.
*
@@ -92,18 +119,20 @@ class FunctionalIndexSupport(spark: SparkSession,
* @return An `Option` containing the index partition identifier if a
matching index definition is found.
* Returns `None` if no matching index definition is found.
*/
- private def getFunctionalIndexPartition(queryFilters: Seq[Expression]):
Option[String] = {
- val functionToColumnNames = extractSparkFunctionNames(queryFilters)
- if (functionToColumnNames.nonEmpty) {
- // Currently, only one functional index in the query is supported.
HUDI-7620 for supporting multiple functions.
- checkState(functionToColumnNames.size == 1, "Currently, only one
function with functional index in the query is supported")
- val (indexFunction, targetColumnName) = functionToColumnNames.head
- val indexDefinitions =
metaClient.getIndexMetadata.get().getIndexDefinitions
- indexDefinitions.asScala.collectFirst {
- case (indexPartition, indexDefinition)
- if indexDefinition.getIndexFunction.equals(indexFunction) &&
indexDefinition.getSourceFields.contains(targetColumnName) =>
- indexPartition
- }
+ private def getFunctionalIndexPartitionAndLiterals(queryFilters:
Seq[Expression]): Option[Tuple2[String, List[String]]] = {
+ val indexDefinitions =
metaClient.getIndexMetadata.get().getIndexDefinitions.asScala
+ if (indexDefinitions.nonEmpty) {
+ val functionDefinitions = indexDefinitions.values
+ .filter(definition =>
MetadataPartitionType.fromPartitionPath(definition.getIndexName).equals(MetadataPartitionType.FUNCTIONAL_INDEX))
+ .toList
+ var indexPartitionAndLiteralsOpt: Option[Tuple2[String, List[String]]] =
Option.empty
+ functionDefinitions.foreach(indexDefinition => {
+ val queryInfoOpt = extractQueryAndLiterals(queryFilters,
indexDefinition)
+ if (queryInfoOpt.isDefined) {
+ indexPartitionAndLiteralsOpt =
Option.apply(Tuple2.apply(indexDefinition.getIndexName, queryInfoOpt.get._2))
+ }
+ })
+ indexPartitionAndLiteralsOpt
} else {
Option.empty
}
@@ -118,20 +147,18 @@ class FunctionalIndexSupport(spark: SparkSession,
* one of the functions and operates on a single column, this method maps
the function name to the
* column name.
*/
- private def extractSparkFunctionNames(queryFilters: Seq[Expression]):
Map[String, String] = {
- queryFilters.flatMap { expr =>
- // Support only simple binary expression on single column
- if (expr.references.size == 1) {
- val targetColumnName = expr.references.head.name
- // Check if the expression string contains any of the function names
- val exprString = expr.toString
- SPARK_FUNCTION_MAP.asScala.keys
- .find(exprString.contains)
- .map(functionName => functionName -> targetColumnName)
- } else {
- None // Skip expressions that do not match the criteria
+ private def extractQueryAndLiterals(queryFilters: Seq[Expression],
indexDefinition: HoodieIndexDefinition): Option[(Expression, List[String])] = {
+ val functionalIndexQueries =
filterQueriesWithFunctionalFilterKey(queryFilters,
Option.apply(indexDefinition.getSourceFields.get(0)))
+ var queryAndLiteralsOpt: Option[(Expression, List[String])] = Option.empty
+ functionalIndexQueries.foreach { tuple =>
+ val (expr, literals) = (tuple._1, tuple._2)
+ val functionNameOption =
SPARK_FUNCTION_MAP.asScala.keys.find(expr.toString.contains)
+ val functionName = functionNameOption.getOrElse("identity")
+ if (indexDefinition.getIndexFunction.equals(functionName)) {
+ queryAndLiteralsOpt = Option.apply(Tuple2.apply(expr, literals))
}
- }.toMap
+ }
+ queryAndLiteralsOpt
}
def loadFunctionalIndexDataFrame(indexPartition: String,
@@ -139,11 +166,6 @@ class FunctionalIndexSupport(spark: SparkSession,
shouldReadInMemory: Boolean): DataFrame = {
val colStatsDF = {
val indexDefinition =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
- val indexType = indexDefinition.getIndexType
- // NOTE: Currently only functional indexes created using column_stats is
supported.
- // HUDI-7007 tracks for adding support for other index types such as
bloom filters.
-
checkState(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS),
- s"Index type $indexType is not supported")
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadFunctionalIndexForColumnsInternal(
indexDefinition.getSourceFields.asScala.toSeq, prunedPartitions,
indexPartition, shouldReadInMemory)
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
@@ -198,6 +220,44 @@ class FunctionalIndexSupport(spark: SparkSession,
columnStatsRecords
}
+
+ def getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ includeLogFiles: Boolean = false):
Map[String, Set[String]] = {
+ prunedPartitionsAndFileSlices.foldLeft(Map.empty[String, Set[String]]) {
+ case (partitionToFileMap, (partitionPathOpt, fileSlices)) =>
+ partitionPathOpt match {
+ case Some(partitionPath) =>
+ val fileNames = fileSlices.flatMap { fileSlice =>
+ val baseFile =
Option(fileSlice.getBaseFile.orElse(null)).map(_.getFileName).toSeq
+ val logFiles = if (includeLogFiles) {
+
fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toSeq
+ } else Seq.empty[String]
+ baseFile ++ logFiles
+ }.toSet
+
+ // Update the map with the new partition and its file names
+ partitionToFileMap.updated(partitionPath.path,
partitionToFileMap.getOrElse(partitionPath.path, Set.empty) ++ fileNames)
+ case None =>
+ partitionToFileMap // Skip if no partition path
+ }
+ }
+ }
+
+ private def getCandidateFilesForKeys(indexPartition: String,
prunedPartitionAndFileNames: Map[String, Set[String]], keys: List[String]):
Set[String] = {
+ val candidateFiles = prunedPartitionAndFileNames.flatMap { case
(partition, fileNames) =>
+ fileNames.filter { fileName =>
+ val bloomFilterOpt =
toScalaOption(metadataTable.getBloomFilter(partition, fileName, indexPartition))
+ bloomFilterOpt match {
+ case Some(bloomFilter) =>
+ keys.exists(bloomFilter.mightContain)
+ case None =>
+ true // If bloom filter is empty or undefined, assume the file
might contain the record key
+ }
+ }
+ }.toSet
+
+ candidateFiles
+ }
}
object FunctionalIndexSupport {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 51e0f846551..fdaa22e5a50 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -129,10 +129,16 @@ object RecordLevelIndexSupport {
* @param queryFilter The query that need to be filtered.
* @return Tuple of filtered query and list of record key literals that need
to be matched
*/
+
def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt:
Option[String]): Option[(Expression, List[String])] = {
+ filterQueryWithRecordKey(queryFilter, recordKeyOpt, attributeFetcher =
expr => expr)
+ }
+
+ def filterQueryWithRecordKey(queryFilter: Expression, recordKeyOpt:
Option[String], attributeFetcher: Function1[Expression, Expression]
+ ): Option[(Expression, List[String])] = {
queryFilter match {
case equalToQuery: EqualTo =>
- val attributeLiteralTuple =
getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
+ val attributeLiteralTuple =
getAttributeLiteralTuple(attributeFetcher.apply(equalToQuery.left),
attributeFetcher.apply(equalToQuery.right)).orNull
if (attributeLiteralTuple != null) {
val attribute = attributeLiteralTuple._1
val literal = attributeLiteralTuple._2
@@ -147,7 +153,7 @@ object RecordLevelIndexSupport {
case inQuery: In =>
var validINQuery = true
- inQuery.value match {
+ attributeFetcher.apply(inQuery.value) match {
case attribute: AttributeReference =>
if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
validINQuery = false
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
index 5b487b740a9..67bae0ef392 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
@@ -101,5 +101,4 @@ class TestSecondaryIndexSupport {
result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
assertTrue(result.isEmpty)
}
-
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index d6d0b6fe233..6e975f08baf 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.exception.HoodieIndexException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.metadata.MetadataPartitionType
+import org.apache.hudi.metadata.{HoodieTableMetadataUtil,
MetadataPartitionType}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -51,7 +51,9 @@ case class CreateIndexCommand(table: CatalogTable,
new util.LinkedHashMap[String, java.util.Map[String, String]]()
columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))
- if (options.contains("func") || indexType.equals("secondary_index")) {
+ if
(indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)
+ || indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+ ||
indexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
val extraOpts = options ++ table.properties
HoodieSparkIndexClient.getInstance(sparkSession).create(
metaClient, indexName, indexType, columnsMap, extraOpts.asJava)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 8969f38f642..d2afdc5e784 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -988,7 +988,6 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
-
/**
* Test case to write with updates and validate secondary index with
clustering.
*/
@@ -1112,6 +1111,8 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+
+
private def confirmLastCommitType(actionType: ActionType): Unit = {
metaClient = HoodieTableMetaClient.reload(metaClient)
val instants = metaClient.getActiveTimeline.getInstants
@@ -1197,13 +1198,117 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testUpdatesReInsertsDeletes(hoodieTableType: HoodieTableType): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ val tableType = hoodieTableType.name()
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ val sqlTableType = if
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
+ val tableName = s"test_updates_reinserts_deletes_$sqlTableType"
+
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | ts BIGINT,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ | options(
+ | primaryKey ='id',
+ | type = '$sqlTableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ | )
+ | PARTITIONED BY (city, state)
+ | location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.metadata.enable=true")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""|INSERT INTO $tableName(ts, id, rider, driver, fare, city, state)
VALUES
+ |
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+ |
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+ |
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+ |
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas');
+ | """.stripMargin)
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName;")(
+ Seq(1695159649, "trip1", "rider-A", "driver-K", 19.10,
"san_francisco", "california"),
+ Seq(1695091554, "trip2", "rider-C", "driver-M", 27.70, "sunnyvale",
"california"),
+ Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin",
"texas"),
+ Seq(1695516137, "trip4", "rider-F", "driver-P", 34.15, "houston",
"texas"))
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ spark.sql(s"create index idx_rider_$tableName ON $tableName USING
secondary_index(rider)")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where rider = 'rider-E'")(
+ Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin",
"texas"))
+ verifyQueryPredicate(hudiOpts, "rider")
+
+ spark.sql(s"create index idx_driver_$tableName ON $tableName USING
secondary_index(driver)")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where driver = 'driver-P'")(
+ Seq(1695516137, "trip4", "rider-F", "driver-P", 34.15, "houston",
"texas")
+ )
+ verifyQueryPredicate(hudiOpts, "driver")
+
+ // update such that there are two rider-E records
+ spark.sql(s"update $tableName set rider = 'rider-E' where rider =
'rider-F'")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where rider = 'rider-E'")(
+ Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin",
"texas"),
+ Seq(1695516137, "trip4", "rider-E", "driver-P", 34.15, "houston",
"texas")
+ )
+ verifyQueryPredicate(hudiOpts, "rider")
+
+ // delete one of those records
+ spark.sql(s"delete from $tableName where id = 'trip4'")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where rider = 'rider-E'")(
+ Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin",
"texas")
+ )
+
+ // reinsert a rider-E record while changing driver value as well.
+ spark.sql(s"insert into $tableName
values(1695516137,'trip4','rider-G','driver-Q',34.15,'houston','texas')")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where driver = 'driver-Q';")(
+ Seq(1695516137, "trip4", "rider-G", "driver-Q", 34.15, "houston",
"texas")
+ )
+
+ // update two other records to rider-E as well.
+ spark.sql(s"update $tableName set rider = 'rider-E' where rider in
('rider-C','rider-G');")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where rider = 'rider-E'")(
+ Seq(1695091554, "trip2", "rider-E", "driver-M", 27.70, "sunnyvale",
"california"),
+ Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin",
"texas"),
+ Seq(1695516137, "trip4", "rider-E", "driver-Q", 34.15, "houston",
"texas")
+ )
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName where driver = 'driver-Q'")(
+ Seq(1695516137, "trip4", "rider-E", "driver-Q", 34.15, "houston",
"texas")
+ )
+ verifyQueryPredicate(hudiOpts, "rider")
+ }
+ }
+
private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}
- private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName:
String, nonExistantKey: String = ""): Unit = {
+ private def verifyQueryPredicate(hudiOpts: Map[String, String], columnName:
String, nonExistentKey: String = ""): Unit = {
mergedDfList = mergedDfList :+
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
- val secondaryKey = mergedDfList.last.limit(2).collect().filter(row =>
!row.getAs(columnName).toString.equals(nonExistantKey))
+ val secondaryKey = mergedDfList.last.limit(2).collect().filter(row =>
!row.getAs(columnName).toString.equals(nonExistentKey))
.map(row => row.getAs(columnName).toString).head
val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey))
verifyFilePruning(hudiOpts, dataFilter)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 69d7f571c95..7101e38cfe4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -23,6 +23,7 @@ import
org.apache.hudi.DataSourceWriteOptions.{INSERT_OPERATION_OPT_VAL, OPERATI
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.client.utils.SparkMetadataWriterUtils
import org.apache.hudi.{DataSourceReadOptions, FunctionalIndexSupport,
HoodieFileIndex, HoodieSparkUtils}
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
@@ -36,6 +37,7 @@ import org.apache.hudi.hive.HiveSyncConfigHolder._
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.functional.HoodieFunctionalIndex
import org.apache.hudi.metadata.{HoodieMetadataFileSystemView,
MetadataPartitionType}
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH,
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
@@ -45,12 +47,14 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{Column, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, FromUnixTime, Literal}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, FromUnixTime, Literal, Upper}
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.hudi.command.{CreateIndexCommand,
ShowIndexesCommand}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.types.{BinaryType, ByteType, DateType,
DecimalType, IntegerType, ShortType, StringType, StructType, TimestampType}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
import org.scalatest.Ignore
import java.util.stream.Collectors
@@ -249,8 +253,9 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
assertEquals("func_index_idx_datestr",
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
- // Verify one can create more than one functional index
- createIndexSql = s"create index name_lower on $tableName using
column_stats(ts) options(func='identity')"
+ // Verify one can create more than one functional index. When
function is not provided,
+ // default identity function is used
+ createIndexSql = s"create index name_lower on $tableName using
column_stats(ts)"
spark.sql(createIndexSql)
metaClient = createMetaClient(spark, basePath)
functionalIndexMetadata = metaClient.getIndexMetadata.get()
@@ -327,7 +332,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
assert(mdtPartitions.contains("func_index_name_lower") &&
mdtPartitions.contains("func_index_idx_datestr"))
// drop functional index
- spark.sql(s"drop index func_index_idx_datestr on $tableName")
+ spark.sql(s"drop index idx_datestr on $tableName")
// validate table config
metaClient = HoodieTableMetaClient.reload(metaClient)
assert(!metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
@@ -685,10 +690,8 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
| $partitionByClause
| location '$basePath'
""".stripMargin)
- if (tableType == "mor") {
- spark.sql("set hoodie.compact.inline=true")
- spark.sql("set hoodie.compact.inline.max.delta.commits=2")
- }
+
+ setCompactionConfigs(tableType)
if (!isPartitioned) {
// setting this for non-partitioned table to ensure multiple
file groups are created
spark.sql(s"set
${HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()}=0")
@@ -741,6 +744,100 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ @Test
+ def testBloomFiltersIndexPruning(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName + s"_bloom_pruning_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts BIGINT,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ """.stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.metadata.enable=true")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city,
state) VALUES
+ |
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+ |
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+ |
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+ |
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city,
state) VALUES
+ |
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+ |
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+ |""".stripMargin)
+
+ // create index using bloom filters on city column with upper()
function
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(func='upper', numHashFunctions=1,
fpp=0.00000000001)")
+
+ // Pruning takes place only if query uses upper function on city
+ checkAnswer(s"select id, rider from $tableName where upper(city) in
('sunnyvale', 'sg')")()
+ checkAnswer(s"select id, rider from $tableName where lower(city) =
'sunny'")()
+ checkAnswer(s"select id, rider from $tableName where upper(city) =
'SUNNYVALE'")(
+ Seq("trip2", "rider-C")
+ )
+ // verify file pruning
+ var metaClient = createMetaClient(spark, basePath)
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key
-> "true", HoodieMetadataConfig.ENABLE.key -> "true")
+ val cityColumn = AttributeReference("city", StringType)()
+ val upperCityExpr = Upper(cityColumn) // Apply the `upper` function
to the city column
+ val targetCityUpper = Literal.create("SUNNYVALE")
+ val dataFilterUpperCityEquals = EqualTo(upperCityExpr,
targetCityUpper)
+ verifyFilePruning(opts, dataFilterUpperCityEquals, metaClient,
isDataSkippingExpected = true)
+
+ // drop index and recreate without upper() function
+ spark.sql(s"drop index idx_bloom_$tableName on $tableName")
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+ // Pruning takes place only if query uses no function on city
+ checkAnswer(s"select id, rider from $tableName where city =
'sunnyvale'")(
+ Seq("trip2", "rider-C")
+ )
+ metaClient = createMetaClient(spark, basePath)
+ // verify file pruning
+ val targetCity = Literal.create("sunnyvale")
+ val dataFilterCityEquals = EqualTo(cityColumn, targetCity)
+ verifyFilePruning(opts, dataFilterCityEquals, metaClient,
isDataSkippingExpected = true)
+ // validate IN query
+ checkAnswer(s"select id, rider from $tableName where city in
('san_diego', 'sunnyvale')")(
+ Seq("trip2", "rider-C"),
+ Seq("trip5", "rider-A"),
+ Seq("trip6", "rider-C")
+ )
+ }
+ }
+ }
+ }
+
private def assertTableIdentifier(catalogTable: CatalogTable,
expectedDatabaseName: String,
expectedTableName: String): Unit = {
@@ -756,6 +853,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
val tableName = generateTableName + s"_init_$tableType$isPartitioned"
val partitionByClause = if (isPartitioned) "partitioned by(price)"
else ""
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ setCompactionConfigs(tableType)
spark.sql(
s"""
|create table $tableName (
@@ -773,7 +871,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
| location '$basePath'
""".stripMargin)
- writeRecordsAndValidateFunctionalIndex(tableName, basePath,
"update", isDelete = false, shouldCompact = false, shouldCluster = false,
shouldRollback = false)
+ writeRecordsAndValidateFunctionalIndex(tableName, basePath, isDelete
= false, shouldCompact = false, shouldCluster = false, shouldRollback = false)
}
}
}
@@ -787,6 +885,7 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
val tableName = generateTableName +
s"_rollback_$tableType$isPartitioned"
val partitionByClause = if (isPartitioned) "partitioned by(price)"
else ""
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ setCompactionConfigs(tableType)
spark.sql(
s"""
|create table $tableName (
@@ -804,23 +903,28 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
| location '$basePath'
""".stripMargin)
- writeRecordsAndValidateFunctionalIndex(tableName, basePath,
"update", isDelete = false, shouldCompact = false, shouldCluster = false,
shouldRollback = true)
+ writeRecordsAndValidateFunctionalIndex(tableName, basePath, isDelete
= false, shouldCompact = false, shouldCluster = false, shouldRollback = true)
}
}
}
}
+ private def setCompactionConfigs(tableType: String): Unit = {
+ spark.sql(s"set hoodie.compact.inline= ${if (tableType == "mor") "true"
else "false"}")
+ if (tableType == "mor") {
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+ }
+ }
+
/**
* Write records to the table with the given operation type and do updates
or deletes, and then validate functional index.
*/
private def writeRecordsAndValidateFunctionalIndex(tableName: String,
basePath: String,
- operationType: String,
isDelete: Boolean,
shouldCompact: Boolean,
shouldCluster: Boolean,
- shouldRollback: Boolean,
- shouldValidate: Boolean =
true): Unit = {
+ shouldRollback: Boolean):
Unit = {
// a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
spark.sql(s"insert into $tableName values(1, 'a1', 1601098924, 10)")
// a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
@@ -1034,10 +1138,53 @@ class TestFunctionalIndex extends
HoodieSparkSqlTestBase {
assertTrue(prunedPartitionAndFileNames._1.size == 1) // partition
assertTrue(prunedPartitionAndFileNames._2.size == 1) // log file
assertTrue(FSUtils.isLogFile(prunedPartitionAndFileNames._2.head))
+
+ val prunedPartitionAndFileNamesMap =
functionalIndexSupport.getPrunedPartitionsAndFileNamesMap(prunedPaths,
includeLogFiles = true)
+ assertTrue(prunedPartitionAndFileNamesMap.keySet.size == 1) //
partition
+ assertTrue(prunedPartitionAndFileNamesMap.values.head.size == 1) //
log file
+
assertTrue(FSUtils.isLogFile(prunedPartitionAndFileNamesMap.values.head.head))
}
}
}
+ test("testGetFunctionalIndexRecordsUsingBloomFilter") {
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key -> "true"
+ )
+ val opts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key ->
"testGetFunctionalIndexRecordsUsingBloomFilter",
+ TABLE_TYPE.key -> "MERGE_ON_READ",
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ PARTITIONPATH_FIELD.key() -> "c8",
+ // setting IndexType to be INMEMORY to simulate Global Index nature
+ HoodieIndexConfig.INDEX_TYPE.key -> HoodieIndex.IndexType.INMEMORY.name()
+ )
+ val sourceJSONTablePath =
getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString
+
+ // NOTE: Schema here is provided for validation that the input date is in
the appropriate format
+ val sourceTableSchema: StructType = new StructType()
+ .add("c1", IntegerType)
+ .add("c2", StringType)
+ .add("c3", DecimalType(9, 3))
+ .add("c4", TimestampType)
+ .add("c5", ShortType)
+ .add("c6", DateType)
+ .add("c7", BinaryType)
+ .add("c8", ByteType)
+ var df = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
+ df =
df.withColumn(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
lit("c/d"))
+
.withColumn(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_RELATIVE_FILE_PATH,
lit("c/d/123141ab-701b-4ba4-b60b-e6acd9e9103e-0_329-224134-258390_2131313124.parquet"))
+ .withColumn(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE,
lit(100))
+ val bloomFilterRecords =
SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(df, "c5",
HoodieWriteConfig.newBuilder().withPath("a/b").build(), "", "random")
+ // Since there is only one partition file pair there is only one bloom
filter record
+ assertEquals(1, bloomFilterRecords.collectAsList().size())
+ assertFalse(bloomFilterRecords.isEmpty)
+ }
+
private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean
= false, isNoScanExpected: Boolean = false): Unit = {
// with data skipping
val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)