This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e0cf4b67a4e [HUDI-7146] Integrate secondary index on reader path
(#11162)
e0cf4b67a4e is described below
commit e0cf4b67a4ec4fbf345563dac19170220f306b58
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jun 7 12:17:36 2024 +0530
[HUDI-7146] Integrate secondary index on reader path (#11162)
This PR integrates the secondary index on the reader path as follows:
- Add HoodieTableMetadata#readSecondaryIndex API
- Add SecondaryIndexSupport, a util class to get candidate files based on
query filter and secondary index
- Changes in HoodieFileIndex to use SecondaryIndexSupport
---
.../client/common/HoodieFlinkEngineContext.java | 6 +
.../client/common/HoodieJavaEngineContext.java | 6 +
.../client/common/HoodieSparkEngineContext.java | 9 +
.../hudi/common/engine/HoodieEngineContext.java | 14 ++
.../common/engine/HoodieLocalEngineContext.java | 6 +
.../common/model/HoodieMetadataRecordMerger.java | 12 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 35 +++-
.../metadata/FileSystemBackedTableMetadata.java | 5 +
.../hudi/metadata/HoodieBackedTableMetadata.java | 187 +++++++++++++++++++--
.../metadata/HoodieMetadataLogRecordReader.java | 4 +-
.../apache/hudi/metadata/HoodieTableMetadata.java | 6 +
.../model/TestHoodieMetadataRecordMerger.java | 4 +-
.../hudi/HoodieSparkFunctionalIndexClient.java | 1 +
.../scala/org/apache/hudi/HoodieFileIndex.scala | 4 +
.../org/apache/hudi/SecondaryIndexSupport.scala | 130 ++++++++++++++
.../apache/hudi/TestSecondaryIndexSupport.scala | 91 ++++++++++
.../hudi/functional/SecondaryIndexTestBase.scala | 79 +++++++--
.../hudi/functional/TestFunctionalIndex.scala | 16 +-
...thSql.scala => TestSecondaryIndexPruning.scala} | 82 ++++-----
.../hudi/command/index/TestSecondaryIndex.scala | 101 ++++++++++-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 14 +-
21 files changed, 729 insertions(+), 83 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index cf4d0da4850..be73658e0e2 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -192,6 +193,11 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
// no operation for now
}
+ @Override
+ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+ return data.collectAsList().stream().parallel().reduce(zeroValue,
seqOp::apply, combOp::apply);
+ }
+
/**
* Override the flink context supplier to return constant write token.
*/
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index df864a3334d..f542d5a0f7f 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -169,4 +170,9 @@ public class HoodieJavaEngineContext extends
HoodieEngineContext {
public void cancelAllJobs() {
// no operation for now
}
+
+ @Override
+ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+ return data.collectAsList().stream().reduce(zeroValue, seqOp::apply,
combOp::apply);
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 84fe97dcc8e..b1763634bc0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext;
@@ -229,6 +231,13 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
javaSparkContext.cancelAllJobs();
}
+ @Override
+ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+ Function2<O, I, O> seqOpFunc = seqOp::apply;
+ Function2<O, O, O> combOpFunc = combOp::apply;
+ return HoodieJavaRDD.getJavaRDD(data).aggregate(zeroValue, seqOpFunc,
combOpFunc);
+ }
+
public SparkConf getConf() {
return javaSparkContext.getConf();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 597a2ea12a4..b16f9330292 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -109,4 +110,17 @@ public abstract class HoodieEngineContext {
public abstract void cancelJob(String jobId);
public abstract void cancelAllJobs();
+
+ /**
+ * Aggregate the elements of each partition, and then the results for all
the partitions, using given combine functions and a neutral "zero value".
+ *
+ * @param data
+ * @param zeroValue the initial value for the accumulated result of each
partition for the seqOp operator
+ * @param seqOp function to aggregate the elements of each partition
+ * @param combOp function to combine results from different partitions
+ * @param <I> input object type
+ * @param <O> output object type
+ * @return the result of the aggregation
+ */
+ public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index e1252d246b4..d605349b30d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -168,4 +169,9 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public void cancelAllJobs() {
// no operation for now
}
+
+ @Override
+ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
+ return data.collectAsList().stream().reduce(zeroValue, seqOp::apply,
combOp::apply);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
index c118cd803bc..477db44aa5e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
@@ -25,9 +25,12 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
/**
* Record merger that accumulates metadata records.
*/
@@ -37,7 +40,12 @@ public class HoodieMetadataRecordMerger extends
HoodiePreCombineAvroRecordMerger
@Override
public List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props)
throws IOException {
- // TODO: Implement this method for secondary keys. Currently, it just
mimics the superclass.
- return Collections.singletonList(super.merge(older, oldSchema, newer,
newSchema, props).get());
+ // If the new record is not a delete record, then combine the two records.
+ if (newer.isDelete(newSchema, props)) {
+ return Collections.singletonList(Pair.of(newer, newSchema));
+ }
+ checkArgument(older.getRecordKey().equals(newer.getRecordKey()), "Record
key must be the same for both records");
+ checkArgument(oldSchema.equals(newSchema), "Schema must be the same for
both records");
+ return Arrays.asList(Pair.of(older, oldSchema), Pair.of(newer, newSchema));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index ca3c6543afd..633ab7cd1da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -312,6 +312,33 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
return recordKeyToLocation;
}
+ /**
+ * Get record-location using secondary-index and record-index
+ * <p>
+ * If the Metadata Table is not enabled, an exception is thrown to
distinguish this from the absence of the key.
+ *
+ * @param secondaryKeys The list of secondary keys to read
+ */
+ @Override
+ public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
+ "Record index is not initialized in MDT");
+ ValidationUtils.checkState(
+
dataMetaClient.getTableConfig().getMetadataPartitions().contains(partitionName),
+ "Secondary index is not initialized in MDT for: " + partitionName);
+ // Fetch secondary-index records
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> secondaryKeyRecords
= getSecondaryIndexRecords(secondaryKeys, partitionName);
+ // Now collect the record-keys and fetch the RLI records
+ List<String> recordKeys = new ArrayList<>();
+ secondaryKeyRecords.forEach((key, records) -> records.forEach(record -> {
+ if (!record.getData().isDeleted()) {
+ recordKeys.add(record.getData().getRecordKeyFromSecondaryIndex());
+ }
+ }));
+
+ return readRecordIndex(recordKeys);
+ }
+
/**
* Returns a map of (record-key -> secondary-key) for the provided record
keys.
*/
@@ -380,8 +407,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
return pathInfoList;
}
- Map<String, List<StoragePathInfo>>
fetchAllFilesInPartitionPaths(List<StoragePath> partitionPaths)
- throws IOException {
+ Map<String, List<StoragePathInfo>>
fetchAllFilesInPartitionPaths(List<StoragePath> partitionPaths) {
Map<String, StoragePath> partitionIdToPathMap =
partitionPaths.parallelStream()
.collect(
@@ -439,6 +465,11 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
protected abstract Map<String, String>
getSecondaryKeysForRecordKeys(List<String> recordKeys, String partitionName);
+ /**
+ * Returns a map of (record-key -> list-of-secondary-index-records) for the
provided secondary keys.
+ */
+ protected abstract Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName);
+
public HoodieMetadataConfig getMetadataConfig() {
return metadataConfig;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 1fd9d0f5696..c7483e1e4f3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -319,6 +319,11 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
throw new HoodieMetadataException("Unsupported operation:
readRecordIndex!");
}
+ @Override
+ public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+ throw new HoodieMetadataException("Unsupported operation:
readSecondaryIndex!");
+ }
+
@Override
public int getNumFileGroupsForPartition(MetadataPartitionType partition) {
throw new HoodieMetadataException("Unsupported operation:
getNumFileGroupsForPartition");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 18ad59dbb26..aa13ab0f0ad 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
@@ -37,11 +38,11 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -66,6 +67,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -75,8 +77,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
@@ -206,7 +210,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// records matching the key-prefix
List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
- ValidationUtils.checkState(!partitionFileSlices.isEmpty(), "Number of file
slices for partition " + partitionName + " should be > 0");
+ checkState(!partitionFileSlices.isEmpty(), "Number of file slices for
partition " + partitionName + " should be > 0");
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
engineContext.parallelize(partitionFileSlices))
@@ -259,7 +263,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
final int numFileSlices = partitionFileSlices.size();
- ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+ checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
if (numFileSlices == 1) {
@@ -306,7 +310,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
final int numFileSlices = partitionFileSlices.size();
- ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+ checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
if (numFileSlices == 1) {
@@ -503,10 +507,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
kv.getKey(),
kv.getValue(),
(oldRecordList, newRecordList) -> {
- // TODO(vinay): Remove this assert and handle secondary keys
correctly
- assert oldRecordList.size() <= 1;
- assert newRecordList.size() <= 1;
-
List<HoodieRecord<HoodieMetadataPayload>> mergedRecordList = new
ArrayList<>();
HoodieMetadataPayload mergedPayload = null;
HoodieKey key = null;
@@ -583,7 +583,10 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
try {
HoodieTimer timer = HoodieTimer.start();
// Open base file reader
- Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair =
getBaseFileReader(slice, timer);
+ // If the partition is a secondary index partition, use the HBase HFile
reader instead of native HFile reader.
+ // TODO (HUDI-7831): Support reading secondary index records using
native HFile reader.
+ boolean shouldUseNativeHFileReader =
!partitionName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX);
+ Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair =
getBaseFileReader(slice, timer, shouldUseNativeHFileReader);
HoodieSeekingFileReader<?> baseFileReader =
baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
@@ -602,16 +605,20 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
}
- private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice
slice, HoodieTimer timer) throws IOException {
+ private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice
slice, HoodieTimer timer, boolean shouldUseNativeHFileReader) throws
IOException {
HoodieSeekingFileReader<?> baseFileReader;
long baseFileOpenMs;
// If the base file is present then create a reader
Option<HoodieBaseFile> baseFile = slice.getBaseFile();
if (baseFile.isPresent()) {
StoragePath baseFilePath = baseFile.get().getStoragePath();
+ HoodieConfig readerConfig = DEFAULT_HUDI_CONFIG_FOR_READER;
+ if (!shouldUseNativeHFileReader) {
+ readerConfig.setValue(USE_NATIVE_HFILE_READER, "false");
+ }
baseFileReader = (HoodieSeekingFileReader<?>)
HoodieIOFactory.getIOFactory(metadataMetaClient.getStorage())
.getReaderFactory(HoodieRecordType.AVRO)
- .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, baseFilePath);
+ .getFileReader(readerConfig, baseFilePath);
baseFileOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata base file from %s at instant %s
in %d ms", baseFilePath,
baseFile.get().getCommitTime(), baseFileOpenMs));
@@ -851,6 +858,164 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return recordKeyMap;
}
+ @Override
+ protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getSecondaryIndexRecords(List<String> keys, String partitionName) {
+ if (keys.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
+
+ engineContext.setJobStatus(this.getClass().getSimpleName(), "Lookup keys
from each file slice");
+ HoodieData<FileSlice> partitionRDD =
engineContext.parallelize(partitionFileSlices);
+ // Define the seqOp function (merges elements within a partition)
+ Functions.Function2<Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>, FileSlice, Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>> seqOp =
+ (accumulator, partition) -> {
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys,
partition);
+ currentFileSliceResult.forEach((secondaryKey, secondaryRecords) ->
accumulator.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
+ newRecords.addAll(oldRecords);
+ return newRecords;
+ }));
+ return accumulator;
+ };
+ // Define the combOp function (merges elements across partitions)
+ Functions.Function2<Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>, Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>, Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>> combOp =
+ (map1, map2) -> {
+ map2.forEach((secondaryKey, secondaryRecords) ->
map1.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
+ newRecords.addAll(oldRecords);
+ return newRecords;
+ }));
+ return map1;
+ };
+ // Use aggregate to merge results within and across partitions
+ // Define the zero value (initial value)
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> zeroValue = new
HashMap<>();
+ return engineContext.aggregate(partitionRDD, zeroValue, seqOp, combOp);
+ }
+
+ /**
+ * Lookup list of keys from a single file slice.
+ *
+ * @param partitionName Name of the partition
+ * @param secondaryKeys The list of secondary keys to lookup
+ * @param fileSlice The file slice to read
+ * @return A {@code Map} of secondary-key to list of {@code HoodieRecord}
for the secondary-keys which were found in the file slice
+ */
+ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
lookupSecondaryKeysFromFileSlice(String partitionName, List<String>
secondaryKeys, FileSlice fileSlice) {
+ Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice);
+ try {
+ List<Long> timings = new ArrayList<>(1);
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+ if (baseFileReader == null && logRecordScanner == null) {
+ return Collections.emptyMap();
+ }
+
+ // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
+ Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+ List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+ Collections.sort(sortedSecondaryKeys);
+ secondaryKeySet.addAll(sortedSecondaryKeys);
+
+ logRecordScanner.getRecords().forEach(record -> {
+ HoodieMetadataPayload payload = record.getData();
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
+ if (secondaryKeySet.contains(recordKey)) {
+ String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
+ logRecordsMap.computeIfAbsent(secondaryKey, k -> new
HashMap<>()).put(recordKey, record);
+ }
+ });
+
+ return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader,
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for " + secondaryKeys.size() + " key : ", ioe);
+ } finally {
+ if (!reuse) {
+ closeReader(readers);
+ }
+ }
+ }
+
+ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
readNonUniqueRecordsAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
+
List<String> sortedKeys,
+
Map<String, HashMap<String, HoodieRecord>>
logRecordsMap,
+
List<Long> timings,
+
String partitionName) throws IOException {
+ HoodieTimer timer = HoodieTimer.start();
+
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new
HashMap<>();
+ if (reader == null) {
+ // No base file at all
+ timings.add(timer.endTimer());
+ logRecordsMap.forEach((secondaryKey, logRecords) -> {
+ List<HoodieRecord<HoodieMetadataPayload>> recordList = new
ArrayList<>();
+ logRecords.values().forEach(record -> {
+ recordList.add((HoodieRecord<HoodieMetadataPayload>) record);
+ });
+ resultMap.put(secondaryKey, recordList);
+ });
+ return resultMap;
+ }
+
+ HoodieTimer readTimer = HoodieTimer.start();
+
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> baseFileRecordsMap =
+ fetchBaseFileAllRecordsByKeys(reader, sortedKeys, true, partitionName);
+ logRecordsMap.forEach((secondaryKey, logRecords) -> {
+ if (!baseFileRecordsMap.containsKey(secondaryKey)) {
+ List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
+ .values()
+ .stream()
+ .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+ .collect(Collectors.toList());
+
+ resultMap.put(secondaryKey, recordList);
+ } else {
+ List<HoodieRecord<HoodieMetadataPayload>> baseFileRecords =
baseFileRecordsMap.get(secondaryKey);
+ List<HoodieRecord<HoodieMetadataPayload>> resultRecords = new
ArrayList<>();
+
+ baseFileRecords.forEach(prevRecord -> {
+ HoodieMetadataPayload prevPayload = prevRecord.getData();
+ String recordKey = prevPayload.getRecordKeyFromSecondaryIndex();
+
+ if (!logRecords.containsKey(recordKey)) {
+ resultRecords.add(prevRecord);
+ } else {
+ // Merge the records
+ HoodieRecord<HoodieMetadataPayload> newRecord =
logRecords.get(recordKey);
+ HoodieMetadataPayload newPayload = newRecord.getData();
+
checkState(recordKey.equals(newPayload.getRecordKeyFromSecondaryIndex()),
"Record key mismatch between log record and secondary index record");
+ // The rules for merging the prevRecord and the latestRecord is
noted below. Note that this only applies for SecondaryIndex
+ // records in the metadata table (which is the only user of this
API as of this implementation)
+ // 1. Iff latestRecord is deleted (i.e it is a tombstone) AND
prevRecord is null (i.e not buffered), then discard latestRecord
+ // basefile never had a matching record?
+ // 2. Iff latestRecord is deleted AND prevRecord is non-null, then
remove prevRecord from the buffer AND discard the latestRecord
+ // 3. Iff latestRecord is not deleted AND prevRecord is non-null,
then remove the prevRecord from the buffer AND retain the latestRecord
+ // The rationale is that the most recent record is always
retained (based on arrival time). TODO: verify this logic
+ // 4. Iff latestRecord is not deleted AND prevRecord is null, then
retain the latestRecord (same rationale as #1)
+ if (!newPayload.isSecondaryIndexDeleted()) {
+ // All the four cases boils down to just "Retain newRecord iff
it is not deleted"
+ resultRecords.add(newRecord);
+ }
+ }
+ });
+
+ resultMap.put(secondaryKey, resultRecords);
+ }
+ });
+
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
+
+ timings.add(timer.endTimer());
+ return resultMap;
+ }
+
private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader, Set<String>
keySet, String partitionName) throws IOException {
if (reader == null) {
// No base file at all
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 17b7688133e..b404c9e9ddc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -43,7 +43,7 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
/**
* Metadata log-block records reading implementation, internally relying on
@@ -253,7 +253,7 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
}
private boolean shouldUseMetadataMergedLogRecordScanner() {
- return PARTITION_NAME_SECONDARY_INDEX.equals(partitionName);
+ return partitionName.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX);
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 91d52fdbdaa..f45790b38b4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -228,6 +228,12 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
*/
Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String>
recordKeys);
+ /**
+ * Returns the location of records which the provided secondary keys maps to.
+ * Records that are not found are ignored and won't be part of map object
that is returned.
+ */
+ Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName);
+
/**
* Fetch records by key prefixes. Key prefix passed is expected to match the
same prefix as stored in Metadata table partitions. For eg, in case of col
stats partition,
* actual keys in metadata partition is encoded values of column name,
partition name and file name. So, key prefixes passed to this method is
expected to be encoded already.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
index b63eb9e1531..d734e0d8e49 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
@@ -59,7 +59,7 @@ public class TestHoodieMetadataRecordMerger {
List<HoodieRecord> updateRecordList = dataGen.generateUpdates("0001",
newRecordList);
HoodieMetadataRecordMerger recordMerger = new HoodieMetadataRecordMerger();
List<Pair<HoodieRecord, Schema>> mergedRecords =
recordMerger.fullOuterMerge(newRecordList.get(0), AVRO_SCHEMA,
updateRecordList.get(0), AVRO_SCHEMA, new TypedProperties());
- assertEquals(1, mergedRecords.size());
- assertEquals(updateRecordList.get(0), mergedRecords.get(0).getLeft());
+ assertEquals(2, mergedRecords.size());
+ assertEquals(updateRecordList.get(0), mergedRecords.get(1).getLeft());
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
index 6e924f30d4a..5ea59ea04d2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java
@@ -85,6 +85,7 @@ public class HoodieSparkFunctionalIndexClient extends
BaseHoodieFunctionalIndexC
if (indexExists(metaClient, indexName)) {
throw new HoodieFunctionalIndexException("Index already exists: " +
indexName);
}
+ checkArgument(columns.size() == 1, "Only one column can be indexed for
functional or secondary index.");
if (!metaClient.getTableConfig().getIndexDefinitionPath().isPresent()
|| !metaClient.getIndexMetadata().isPresent()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index ebf0cb8992e..7d5df55af17 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -103,6 +103,7 @@ case class HoodieFileIndex(spark: SparkSession,
@transient private lazy val indicesSupport: List[SparkBaseIndexSupport] =
List(
new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
new BucketIndexSupport(spark, metadataConfig, metaClient),
+ new SecondaryIndexSupport(spark, metadataConfig, metaClient),
new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
new FunctionalIndexSupport(spark, metadataConfig, metaClient),
new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
@@ -411,6 +412,9 @@ case class HoodieFileIndex(spark: SparkSession,
private def isBloomFiltersIndexEnabled: Boolean = indicesSupport.exists(idx
=>
idx.getIndexName == BloomFiltersIndexSupport.INDEX_NAME &&
idx.isIndexAvailable)
+ private def isSecondaryIndexEnabled: Boolean = indicesSupport.exists(idx =>
+ idx.getIndexName == SecondaryIndexSupport.INDEX_NAME &&
idx.isIndexAvailable)
+
private def isIndexEnabled: Boolean = indicesSupport.exists(idx =>
idx.isIndexAvailable)
private def validateConfig(): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
new file mode 100644
index 00000000000..c7e630ea33f
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
+import org.apache.hudi.SecondaryIndexSupport.filterQueriesWithSecondaryKey
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
+import org.apache.hudi.storage.StoragePath
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConverters, mutable}
+
+class SecondaryIndexSupport(spark: SparkSession,
+ metadataConfig: HoodieMetadataConfig,
+ metaClient: HoodieTableMetaClient) extends
RecordLevelIndexSupport(spark, metadataConfig, metaClient) {
+ override def getIndexName: String = SecondaryIndexSupport.INDEX_NAME
+
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean
+ ): Option[Set[String]] = {
+ val secondaryKeyConfigOpt = getSecondaryKeyConfig(queryReferencedColumns,
metaClient)
+ if (secondaryKeyConfigOpt.isEmpty) {
+ Option.empty
+ }
+ lazy val (_, secondaryKeys) = if (isIndexAvailable)
filterQueriesWithSecondaryKey(queryFilters, secondaryKeyConfigOpt.map(_._2))
else (List.empty, List.empty)
+ if (isIndexAvailable && queryFilters.nonEmpty && secondaryKeys.nonEmpty) {
+ val allFiles = fileIndex.inputFiles.map(strPath => new
StoragePath(strPath)).toSeq
+ Some(getCandidateFilesFromSecondaryIndex(allFiles, secondaryKeys,
secondaryKeyConfigOpt.get._1))
+ } else {
+ Option.empty
+ }
+ }
+
+ override def invalidateCaches(): Unit = {
+ // no caches for this index type, do nothing
+ }
+
+ /**
+ * Return true if metadata table is enabled and functional index metadata
partition is available.
+ */
+ override def isIndexAvailable: Boolean = {
+ metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent &&
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
+ }
+
+ /**
+ * Returns the list of candidate files which store the provided record keys
based on Metadata Table Secondary Index
+ * and Metadata Table Record Index.
+ *
+ * @param secondaryKeys - List of secondary keys.
+ * @return Sequence of file names which need to be queried
+ */
+ private def getCandidateFilesFromSecondaryIndex(allFiles: Seq[StoragePath],
secondaryKeys: List[String], secondaryIndexName: String): Set[String] = {
+ val recordKeyLocationsMap =
metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava,
secondaryIndexName)
+ val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
+ val candidateFiles: mutable.Set[String] = mutable.Set.empty
+ for (locations <-
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
{
+ for (location <-
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
+ fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
+ }
+ }
+
+ for (file <- allFiles) {
+ val fileId = FSUtils.getFileIdFromFilePath(file)
+ val partitionOpt = fileIdToPartitionMap.get(fileId)
+ if (partitionOpt.isDefined) {
+ candidateFiles += file.getName
+ }
+ }
+ candidateFiles.toSet
+ }
+
+ /**
+ * Returns the configured secondary key for the table
+ * TODO: Handle multiple secondary indexes (similar to functional index)
+ */
+ private def getSecondaryKeyConfig(queryReferencedColumns: Seq[String],
+ metaClient: HoodieTableMetaClient):
Option[(String, String)] = {
+ val indexDefinitions =
metaClient.getIndexMetadata.get.getIndexDefinitions.asScala
+ indexDefinitions.values
+ .find(indexDef =>
indexDef.getIndexType.equals(PARTITION_NAME_SECONDARY_INDEX) &&
+ queryReferencedColumns.contains(indexDef.getSourceFields.get(0)))
+ .map(indexDef => (indexDef.getIndexName,
indexDef.getSourceFields.get(0)))
+ }
+}
+
+object SecondaryIndexSupport {
+ val INDEX_NAME = "secondary_index"
+
+ def filterQueriesWithSecondaryKey(queryFilters: Seq[Expression],
+ secondaryKeyConfigOpt: Option[String]):
(List[Expression], List[String]) = {
+ var secondaryKeyQueries: List[Expression] = List.empty
+ var secondaryKeys: List[String] = List.empty
+ for (query <- queryFilters) {
+ filterQueryWithRecordKey(query, secondaryKeyConfigOpt).foreach({
+ case (exp: Expression, recKeys: List[String]) =>
+ secondaryKeys = secondaryKeys ++ recKeys
+ secondaryKeyQueries = secondaryKeyQueries :+ exp
+ })
+ }
+
+ Tuple2.apply(secondaryKeyQueries, secondaryKeys)
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
new file mode 100644
index 00000000000..d2bfb055d62
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestSecondaryIndexSupport.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.SecondaryIndexSupport.filterQueriesWithSecondaryKey
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, FromUnixTime, GreaterThan, In, Literal, Not}
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.util.TimeZone
+
+class TestSecondaryIndexSupport {
+
+ @Test
+ def testFilterQueryWithSecondaryKey(): Unit = {
+ // Case 1: EqualTo filters not on simple AttributeReference and
non-Literal should return empty result
+ val fmt = "yyyy-MM-dd HH:mm:ss"
+ val fromUnixTime = FromUnixTime(Literal(0L), Literal(fmt),
Some(TimeZone.getDefault.getID))
+ var testFilter: Expression = EqualTo(fromUnixTime, Literal("2020-01-01
00:10:20"))
+ var result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.empty)._2
+ assertTrue(result.isEmpty)
+
+ // Case 2: EqualTo filters not on Literal and not on simple
AttributeReference should return empty result
+ testFilter = EqualTo(Literal("2020-01-01 00:10:20"), fromUnixTime)
+ result = filterQueriesWithSecondaryKey(Seq(testFilter), Option.empty)._2
+ assertTrue(result.isEmpty)
+
+ // Case 3: EqualTo filters on simple AttributeReference and non-Literal
should return empty result
+ testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable =
true)(), fromUnixTime)
+ result = filterQueriesWithSecondaryKey(Seq(testFilter), Option.empty)._2
+ assertTrue(result.isEmpty)
+
+ // Case 4: EqualTo filters on simple AttributeReference and Literal which
should return non-empty result
+ testFilter = EqualTo(AttributeReference("_row_key", StringType, nullable =
true)(), Literal("row1"))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+ assertTrue(result.nonEmpty)
+ assertEquals(result, List.apply("row1"))
+
+ // case 5: EqualTo on fields other than record key should return empty
result
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply("blah"))._2
+ assertTrue(result.isEmpty)
+
+ // Case 6: In filter on fields other than record key should return empty
result
+ testFilter = In(AttributeReference("_row_key", StringType, nullable =
true)(), List.apply(Literal("xyz"), Literal("abc")))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply("blah"))._2
+ assertTrue(result.isEmpty)
+
+ // Case 7: In filter on record key should return non-empty result
+ testFilter = In(AttributeReference("_row_key", StringType, nullable =
true)(), List.apply(Literal("xyz"), Literal("abc")))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+ assertTrue(result.nonEmpty)
+
+ // Case 8: In filter on simple AttributeReference(on record-key) and
non-Literal should return empty result
+ testFilter = In(AttributeReference("_row_key", StringType, nullable =
true)(), List.apply(fromUnixTime))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+ assertTrue(result.isEmpty)
+
+ // Case 9: Anything other than EqualTo and In predicate is not supported.
Hence it returns empty result
+ testFilter = Not(In(AttributeReference("_row_key", StringType, nullable =
true)(), List.apply(Literal("xyz"), Literal("abc"))))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+ assertTrue(result.isEmpty)
+
+ testFilter = Not(In(AttributeReference("_row_key", StringType, nullable =
true)(), List.apply(fromUnixTime)))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+ assertTrue(result.isEmpty)
+
+ testFilter = GreaterThan(AttributeReference("_row_key", StringType,
nullable = true)(), Literal("row1"))
+ result = filterQueriesWithSecondaryKey(Seq(testFilter),
Option.apply(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName))._2
+ assertTrue(result.isEmpty)
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
index f10b1157a6a..9b7a14a5644 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
@@ -20,32 +20,37 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING,
PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
+import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.{AfterEach, BeforeEach}
-import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConverters
class SecondaryIndexTestBase extends HoodieSparkClientTestBase {
var spark: SparkSession = _
- var instantTime: AtomicInteger = _
- val targetColumnsToIndex: Seq[String] = Seq("rider", "driver")
val metadataOpts: Map[String, String] = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
- HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
- HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key ->
targetColumnsToIndex.mkString(",")
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
)
val commonOpts: Map[String, String] = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
- HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_si",
- RECORDKEY_FIELD.key -> "_row_key",
- PARTITIONPATH_FIELD.key -> "partition,trip_type",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ RECORDKEY_FIELD.key -> "record_key_col",
+ PARTITIONPATH_FIELD.key -> "partition_key_col",
HIVE_STYLE_PARTITIONING.key -> "true",
- PRECOMBINE_FIELD.key -> "timestamp"
+ PRECOMBINE_FIELD.key -> "ts"
) ++ metadataOpts
var mergedDfList: List[DataFrame] = List.empty
@@ -53,7 +58,9 @@ class SecondaryIndexTestBase extends
HoodieSparkClientTestBase {
override def setUp(): Unit = {
initPath()
initSparkContexts()
- setTableName("hoodie_test_si")
+ initHoodieStorage()
+ initTestDataGenerator()
+ setTableName("hoodie_test")
spark = sqlContext.sparkSession
}
@@ -62,4 +69,54 @@ class SecondaryIndexTestBase extends
HoodieSparkClientTestBase {
cleanupResources()
}
+ def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: String):
Unit = {
+ mergedDfList =
spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache()
:: mergedDfList
+ val secondaryKey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs(columnName).toString)
+ val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
+ verifyFilePruning(hudiOpts, dataFilter)
+ }
+
+ private def attribute(partition: String): AttributeReference = {
+ AttributeReference(partition, StringType, nullable = true)()
+ }
+
+
+ private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression): Unit = {
+ // with data skipping
+ val commonOpts = opts + ("path" -> basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
+ val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
+ val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
+ assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+
+ // with no data skipping
+ fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
+ val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
+ assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
+ }
+
+ private def getLatestDataFilesCount(opts: Map[String, String],
includeLogFiles: Boolean = true) = {
+ var totalLatestDataFiles = 0L
+
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+ .values()
+ .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+ (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+ slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
+ + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ totalLatestDataFiles
+ }
+
+ private def getTableFileSystemView(opts: Map[String, String]):
HoodieMetadataFileSystemView = {
+ new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline,
metadataWriter(getWriteConfig(opts)).getTableMetadata)
+ }
+
+ private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig
= {
+ val props =
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
+ HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
index a5e4a148f7c..2812c865c31 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestFunctionalIndex.scala
@@ -309,7 +309,6 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
Seq("cow").foreach { tableType =>
- val databaseName = "default"
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql("set hoodie.metadata.enable=true")
@@ -333,17 +332,12 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName (id, name, ts, price) values(1,
'a1', 1000, 10)")
spark.sql(s"insert into $tableName (id, name, ts, price) values(2,
'a2', 200000, 100)")
spark.sql(s"insert into $tableName (id, name, ts, price) values(3,
'a3', 2000000000, 1000)")
-
- var metaClient = createMetaClient(spark, basePath)
-
- var createIndexSql = s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
-
- spark.sql(createIndexSql)
- spark.sql(s"select key, type, ColumnStatsMetadata from
hudi_metadata('$tableName') where type = 3").show(false)
-
- metaClient = createMetaClient(spark, basePath)
+ // create functional index
+ spark.sql(s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+ // validate index created successfully
+ val metaClient = createMetaClient(spark, basePath)
assertTrue(metaClient.getIndexMetadata.isPresent)
- var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+ val functionalIndexMetadata = metaClient.getIndexMetadata.get()
assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
assertEquals("func_index_idx_datestr",
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
similarity index 50%
rename from
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexWithSql.scala
rename to
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 0331c43ae49..9de8b065f59 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -19,9 +19,10 @@
package org.apache.hudi.functional
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieSparkUtils}
import org.apache.spark.sql.Row
import org.junit.jupiter.api.{Tag, Test}
import org.scalatest.Assertions.assertResult
@@ -30,64 +31,69 @@ import org.scalatest.Assertions.assertResult
* Test cases for secondary index
*/
@Tag("functional")
-class TestSecondaryIndexWithSql extends SecondaryIndexTestBase {
+class TestSecondaryIndexPruning extends SecondaryIndexTestBase {
@Test
- def testSecondaryIndexWithSQL(): Unit = {
+ def testSecondaryIndexWithFilters(): Unit = {
if (HoodieSparkUtils.gteqSpark3_2) {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts + (
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+
spark.sql(
s"""
|create table $tableName (
| ts bigint,
- | id string,
- | rider string,
- | driver string,
- | fare int,
- | city string,
- | state string
+ | record_key_col string,
+ | not_record_key_col string,
+ | partition_key_col string
|) using hudi
| options (
- | primaryKey ='id',
- | type = 'mor',
- | preCombineField = 'ts',
+ | primaryKey ='record_key_col',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
- | hoodie.metadata.index.secondary.enable = 'true',
- | hoodie.datasource.write.recordkey.field = 'id'
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true'
| )
- | partitioned by(state)
+ | partitioned by(partition_key_col)
| location '$basePath'
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName
- | values
- | (1695159649087, '334e26e9-8355-45cc-97c6-c31daf0df330',
'rider-A', 'driver-K', 19, 'san_francisco', 'california'),
- | (1695091554787, 'e96c4396-3fad-413a-a942-4cb36106d720',
'rider-B', 'driver-M', 27, 'austin', 'texas')
- | """.stripMargin
- )
-
- // validate record_index created successfully
- val metadataDF = spark.sql(s"select key from hudi_metadata('$basePath')
where type=5")
- assert(metadataDF.count() == 2)
-
- var metaClient = HoodieTableMetaClient.builder()
- .setBasePath(basePath)
- .setConf(HoodieTestUtils.getDefaultStorageConf)
- .build()
-
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+ spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+ spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
// create secondary index
- spark.sql(s"create index idx_city on $tableName using
secondary_index(city)")
+ spark.sql(s"create index idx_not_record_key_col on $tableName using
secondary_index(not_record_key_col)")
+ // validate index created successfully
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
-
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_city"))
-
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+ // validate data skipping
+ verifyQueryPredicate(hudiOpts, "not_record_key_col")
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
+ Seq("abc", "row1"),
+ Seq("cde", "row2"),
+ Seq("def", "row3")
+ )
+ // create another secondary index on non-string column
+ spark.sql(s"create index idx_ts on $tableName using secondary_index(ts)")
+ // validate index created successfully
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_ts"))
+ // validate data skipping
+ verifyQueryPredicate(hudiOpts, "ts")
+ // validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
- Seq("austin", "e96c4396-3fad-413a-a942-4cb36106d720"),
- Seq("san_francisco", "334e26e9-8355-45cc-97c6-c31daf0df330")
+ Seq("1", "row1"),
+ Seq("2", "row2"),
+ Seq("3", "row3"),
+ Seq("abc", "row1"),
+ Seq("cde", "row2"),
+ Seq("def", "row3")
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 26f1a901d89..de76c4a59a5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -20,6 +20,8 @@
package org.apache.spark.sql.hudi.command.index
import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
class TestSecondaryIndex extends HoodieSparkSqlTestBase {
@@ -67,7 +69,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
"Secondary index already exists: idx_price_1"
)
- spark.sql(s"show indexes from $tableName").show()
checkAnswer(s"show indexes from $tableName")(
Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
Seq("idx_price", "price", "lucene",
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
@@ -76,7 +77,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
checkAnswer(s"drop index idx_name on $tableName")()
checkException(s"drop index idx_name on $tableName")("Secondary
index not exists: idx_name")
- spark.sql(s"show indexes from $tableName").show()
checkAnswer(s"show indexes from $tableName")(
Seq("idx_price", "price", "lucene",
"{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
)
@@ -93,4 +93,101 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Secondary Index Creation With hudi_metadata TVF") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir {
+ tmp => {
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ createTempTableAndInsert(tableName, basePath)
+
+ // validate record_index created successfully
+ val metadataDF = spark.sql(s"select key from
hudi_metadata('$basePath') where type=5")
+ assert(metadataDF.count() == 2)
+
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+ // create secondary index
+ spark.sql(s"create index idx_city on $tableName using
secondary_index(city)")
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_city"))
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
+ Seq("austin", "e96c4396-3fad-413a-a942-4cb36106d720"),
+ Seq("san_francisco", "334e26e9-8355-45cc-97c6-c31daf0df330")
+ )
+ }
+ }
+ }
+ }
+
+ test("Test Secondary Index Creation Failure For Multiple Fields") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir {
+ tmp => {
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ createTempTableAndInsert(tableName, basePath)
+
+ // validate record_index created successfully
+ val metadataDF = spark.sql(s"select key from
hudi_metadata('$basePath') where type=5")
+ assert(metadataDF.count() == 2)
+
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
+ // create secondary index throws error when trying to create on
multiple fields at a time
+ checkException(sql = s"create index idx_city on $tableName using
secondary_index(city,state)")(
+ "Only one column can be indexed for functional or secondary index."
+ )
+ }
+ }
+ }
+ }
+
+ private def createTempTableAndInsert(tableName: String, basePath: String) = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | ts bigint,
+ | id string,
+ | rider string,
+ | driver string,
+ | fare int,
+ | city string,
+ | state string
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.metadata.index.secondary.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id'
+ | )
+ | partitioned by(state)
+ | location '$basePath'
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values
+ | (1695159649087, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A',
'driver-K', 19, 'san_francisco', 'california'),
+ | (1695091554787, 'e96c4396-3fad-413a-a942-4cb36106d720', 'rider-B',
'driver-M', 27, 'austin', 'texas')
+ | """.stripMargin
+ )
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b48e4f4cb1a..8e89ddd9c66 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -27,7 +27,6 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient,
getSparkConfForTest}
-
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -37,12 +36,14 @@ import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
+import org.slf4j.LoggerFactory
import java.io.File
import java.util.TimeZone
class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
+ private val LOG = LoggerFactory.getLogger(getClass)
private lazy val sparkWareHouse = {
val dir = Utils.createTempDir()
@@ -80,16 +81,25 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
}
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any /* Assertion */)(implicit pos: source.Position): Unit = {
+ var ignoreTestErr = false
super.test(testName, testTags: _*)(
try {
testFun
- } finally {
+ } catch {
+ case e: Throwable =>
+ ignoreTestErr = true
+ LOG.warn("Test error due to exception: " + e.getMessage, e)
+ }
+ finally {
val catalog = spark.sessionState.catalog
catalog.listDatabases().foreach{db =>
catalog.listTables(db).foreach {table =>
catalog.dropTable(table, true, true)
}
}
+ if (ignoreTestErr) {
+ spark.stop()
+ }
}
)
}