This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c9f4a0fc8bb [HUDI-8587] Supporting schema evolution with partition
stats index (#12714)
c9f4a0fc8bb is described below
commit c9f4a0fc8bb20f823cf7029f46188416fd3d104f
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Jan 28 06:54:52 2025 -0800
[HUDI-8587] Supporting schema evolution with partition stats index (#12714)
---
.../hudi/client/HoodieColumnStatsIndexUtils.java | 5 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 2 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 4 +-
.../client/utils/SparkMetadataWriterUtils.java | 15 +-
.../SparkHoodieBackedTableMetadataWriter.java | 3 +-
.../apache/hudi/common/util/FileFormatUtils.java | 45 +++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 242 ++++++++++++++-------
.../apache/hudi/common/util/TestBaseFileUtils.java | 8 +-
.../apache/hudi/table/ITTestSchemaEvolution.java | 2 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 82 ++++---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 6 +-
.../TestColStatsRecordWithMetadataRecord.java | 152 ++++++++++++-
.../hudi/functional/ColumnStatIndexTestBase.scala | 11 +-
.../hudi/functional/TestBasicSchemaEvolution.scala | 2 +-
.../apache/hudi/functional/TestMORDataSource.scala | 12 +-
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 2 -
16 files changed, 442 insertions(+), 151 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
index adcbd3c2e4a..238e64bac1a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
@@ -32,6 +32,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
+import java.util.ArrayList;
import java.util.List;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
@@ -66,8 +67,8 @@ public class HoodieColumnStatsIndexUtils {
HoodieCommitMetadata.class);
if
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
{
// update data table's table config for list of columns indexed.
- List<String> columnsToIndex =
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata,
dataTable.getMetaClient(), config.getMetadataConfig(),
- Option.of(config.getRecordMerger().getRecordType()));
+ List<String> columnsToIndex = new
ArrayList(HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata,
dataTable.getMetaClient(), config.getMetadataConfig(),
+ Option.of(config.getRecordMerger().getRecordType())).keySet());
// if col stats is getting updated, lets also update list of
columns indexed if changed.
updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3c3295be486..e1103a8bd4f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -438,7 +438,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
Set<String> columnsToIndexSet = new HashSet<>(HoodieTableMetadataUtil
.getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
config.getMetadataConfig(),
Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
- Option.of(this.recordMerger.getRecordType())));
+ Option.of(this.recordMerger.getRecordType())).keySet());
final List<Pair<String, Schema.Field>> fieldsToIndex =
columnsToIndexSet.stream()
.map(fieldName ->
HoodieAvroUtils.getSchemaForField(writeSchemaWithMetaFields,
fieldName)).collect(Collectors.toList());
try {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cd48b293d0d..9dafe3bad09 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -542,9 +542,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
// Find the columns to index
Lazy<Option<Schema>> tableSchema = Lazy.lazily(() ->
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
- final List<String> columnsToIndex =
HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
+ final List<String> columnsToIndex = new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), tableSchema, true,
- Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
+
Option.of(dataWriteConfig.getRecordMerger().getRecordType())).keySet());
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
if (columnsToIndex.isEmpty()) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 8b9d51c252c..15069004f56 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -20,6 +20,7 @@
package org.apache.hudi.client.utils;
import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -368,7 +369,8 @@ public class SparkMetadataWriterUtils {
*/
public static HoodiePairData<String,
List<HoodieColumnRangeMetadata<Comparable>>>
getExpressionIndexPartitionStatUpdates(HoodieCommitMetadata commitMetadata,
String indexPartition,
HoodieEngineContext engineContext,
HoodieTableMetadata tableMetadata,
-
HoodieTableMetaClient
dataMetaClient, HoodieMetadataConfig metadataConfig) {
+
HoodieTableMetaClient
dataMetaClient, HoodieMetadataConfig metadataConfig,
+
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
// In this function we iterate over all the partitions modified by the
commit and fetch the latest files in those partitions
// We fetch stored Expression index records for these latest files and
return HoodiePairData of partition name and list of column range metadata of
these files
@@ -385,9 +387,14 @@ public class SparkMetadataWriterUtils {
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
Schema tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema)
.orElseThrow(() -> new IllegalStateException(String.format("Expected
writer schema in commit metadata %s", commitMetadata)));
- // filter columns with only supported types
- final List<String> validColumnsToIndex = columnsToIndex.stream()
- .filter(col ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
HoodieTableMetadataUtil.validateDataTypeForPartitionStats(col, tableSchema))
+ List<Pair<String,Schema>> columnsToIndexSchemaMap =
columnsToIndex.stream()
+ .map(columnToIndex -> Pair.of(columnToIndex,
HoodieAvroUtils.getSchemaForField(tableSchema,
columnToIndex).getValue().schema())).collect(
+ Collectors.toList());
+ // filter for supported types
+ final List<String> validColumnsToIndex = columnsToIndexSchemaMap.stream()
+ .filter(colSchemaPair ->
HoodieTableMetadataUtil.SUPPORTED_META_FIELDS_PARTITION_STATS.contains(colSchemaPair.getKey())
+ ||
HoodieTableMetadataUtil.isColumnTypeSupported(colSchemaPair.getValue(),
recordTypeOpt))
+ .map(entry -> entry.getKey())
.collect(Collectors.toList());
if (validColumnsToIndex.isEmpty()) {
return engineContext.emptyHoodieData().mapToPair(o -> Pair.of("", new
ArrayList<>()));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index a0b1d5a0b0a..863a4995f6e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -177,7 +177,8 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
if (isExprIndexUsingColumnStats) {
// Fetch column range metadata for affected partitions in the commit
HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>
exprIndexPartitionStatUpdates =
-
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata,
indexPartition, engineContext, getTableMetadata(), dataMetaClient,
dataWriteConfig.getMetadataConfig())
+
SparkMetadataWriterUtils.getExpressionIndexPartitionStatUpdates(commitMetadata,
indexPartition, engineContext, getTableMetadata(), dataMetaClient,
dataWriteConfig.getMetadataConfig(),
+ Option.of(dataWriteConfig.getRecordMerger().getRecordType()))
.flatMapValues(List::iterator);
// The function below merges the column range metadata from the updated
data with latest column range metadata of affected partition computed above
partitionRecordsFunctionOpt = Option.of(rangeMetadata ->
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index 72fe9f0f4d0..3e295f34d3f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -30,6 +30,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -39,6 +40,7 @@ import org.apache.avro.generic.GenericRecord;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -59,14 +61,53 @@ public abstract class FileFormatUtils {
* @param fileColumnRanges List of column range statistics for each file in
a partition
*/
public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(String relativePartitionPath,
-
@Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges)
{
+
@Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges,
+
Map<String, Schema> colsToIndexSchemaMap) {
+
ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(),
"fileColumnRanges should not be empty.");
+ // Let's do one pass and deduce all columns that needs to go through
schema evolution.
+ Map<String, Set<Class<?>>> schemaSeenForColsToIndex = new HashMap<>();
+ Set<String> colsWithSchemaEvolved = new HashSet<>();
+ fileColumnRanges.stream().forEach(entry -> {
+ String colToIndex = entry.getColumnName();
+ Class<?> minValueClass = entry.getMinValue() != null ?
entry.getMinValue().getClass() : null;
+ Class<?> maxValueClass = entry.getMaxValue() != null ?
entry.getMaxValue().getClass() : null;
+ schemaSeenForColsToIndex.computeIfAbsent(colToIndex, s -> new HashSet());
+ if (minValueClass != null) {
+ schemaSeenForColsToIndex.get(colToIndex).add(minValueClass);
+ }
+ if (maxValueClass != null) {
+ schemaSeenForColsToIndex.get(colToIndex).add(maxValueClass);
+ }
+ if (!colsToIndexSchemaMap.isEmpty() &&
schemaSeenForColsToIndex.get(colToIndex).size() > 1) {
+ colsWithSchemaEvolved.add(colToIndex);
+ }
+ });
+
// There are multiple files. Compute min(file_mins) and max(file_maxs)
return fileColumnRanges.stream()
.map(e -> HoodieColumnRangeMetadata.create(
relativePartitionPath, e.getColumnName(), e.getMinValue(),
e.getMaxValue(),
e.getNullCount(), e.getValueCount(), e.getTotalSize(),
e.getTotalUncompressedSize()))
- .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new
HoodieException("MergingColumnRanges failed."));
+ .reduce((a,b) -> {
+ if (colsWithSchemaEvolved.isEmpty() || colsToIndexSchemaMap.isEmpty()
+ || a.getMinValue() == null || a.getMaxValue() == null ||
b.getMinValue() == null || b.getMaxValue() == null
+ || !colsWithSchemaEvolved.contains(a.getColumnName())) {
+ return HoodieColumnRangeMetadata.merge(a, b);
+ } else {
+ // schema is evolving for the column of interest.
+ Schema schema = colsToIndexSchemaMap.get(a.getColumnName());
+ HoodieColumnRangeMetadata<T> left =
HoodieColumnRangeMetadata.create(a.getFilePath(), a.getColumnName(),
+ (T) HoodieTableMetadataUtil.coerceToComparable(schema,
a.getMinValue()),
+ (T) HoodieTableMetadataUtil.coerceToComparable(schema,
a.getMaxValue()), a.getNullCount(),
+ a.getValueCount(), a.getTotalSize(),
a.getTotalUncompressedSize());
+ HoodieColumnRangeMetadata<T> right =
HoodieColumnRangeMetadata.create(b.getFilePath(), b.getColumnName(),
+ (T) HoodieTableMetadataUtil.coerceToComparable(schema,
b.getMinValue()),
+ (T) HoodieTableMetadataUtil.coerceToComparable(schema,
b.getMaxValue()), b.getNullCount(),
+ b.getValueCount(), b.getTotalSize(),
b.getTotalUncompressedSize());
+ return HoodieColumnRangeMetadata.merge(left, right);
+ }
+ }).orElseThrow(() -> new HoodieException("MergingColumnRanges
failed."));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 194c6250950..91c525eb488 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -129,11 +129,13 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -770,8 +772,8 @@ public class HoodieTableMetadataUtil {
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition,
entry)));
});
- List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
- Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)), false,
recordTypeOpt);
+ List<String> columnsToIndex = new
ArrayList<>(getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
+ Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)), false,
recordTypeOpt).keySet());
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
@@ -1474,12 +1476,12 @@ public class HoodieTableMetadataUtil {
}
try {
- List<String> columnsToIndex = getColumnsToIndex(commitMetadata,
dataMetaClient, metadataConfig, recordTypeOpt);
- if (columnsToIndex.isEmpty()) {
+ Map<String, Schema> columnsToIndexSchemaMap =
getColumnsToIndex(commitMetadata, dataMetaClient, metadataConfig,
recordTypeOpt);
+ if (columnsToIndexSchemaMap.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
-
+ List<String> columnsToIndex = new
ArrayList<>(columnsToIndexSchemaMap.keySet());
int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getColumnStatsIndexParallelism()), 1);
return engineContext.parallelize(allWriteStats, parallelism)
.flatMap(writeStat ->
@@ -1489,7 +1491,7 @@ public class HoodieTableMetadataUtil {
}
}
- public static List<String> getColumnsToIndex(HoodieCommitMetadata
commitMetadata, HoodieTableMetaClient dataMetaClient,
+ public static Map<String, Schema> getColumnsToIndex(HoodieCommitMetadata
commitMetadata, HoodieTableMetaClient dataMetaClient,
HoodieMetadataConfig
metadataConfig, Option<HoodieRecordType> recordTypeOpt) {
Option<Schema> writerSchema =
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
@@ -1512,9 +1514,15 @@ public class HoodieTableMetadataUtil {
static final String[] META_COLS_TO_ALWAYS_INDEX =
{COMMIT_TIME_METADATA_FIELD, RECORD_KEY_METADATA_FIELD,
PARTITION_PATH_METADATA_FIELD};
@VisibleForTesting
public static final Set<String> META_COL_SET_TO_INDEX = new
HashSet<>(Arrays.asList(META_COLS_TO_ALWAYS_INDEX));
+ @VisibleForTesting
+ static final Map<String, Schema> META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP = new
TreeMap() {{
+ put(COMMIT_TIME_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+ put(RECORD_KEY_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+ put(PARTITION_PATH_METADATA_FIELD, Schema.create(Schema.Type.STRING));
+ }};
@VisibleForTesting
- public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig
tableConfig,
HoodieMetadataConfig
metadataConfig,
Lazy<Option<Schema>>
tableSchemaLazyOpt,
Option<HoodieRecordType>
recordType) {
@@ -1522,7 +1530,7 @@ public class HoodieTableMetadataUtil {
}
@VisibleForTesting
- public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig
tableConfig,
HoodieMetadataConfig
metadataConfig,
Lazy<Option<Schema>>
tableSchemaLazyOpt,
boolean isTableInitializing) {
@@ -1530,17 +1538,20 @@ public class HoodieTableMetadataUtil {
}
@VisibleForTesting
- public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ public static Map<String, Schema> getColumnsToIndex(HoodieTableConfig
tableConfig,
HoodieMetadataConfig
metadataConfig,
Lazy<Option<Schema>>
tableSchemaLazyOpt,
boolean isTableInitializing,
Option<HoodieRecordType>
recordType) {
- Stream<String> columnsToIndexWithoutRequiredMetas =
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt,
isTableInitializing, recordType);
+ Map<String, Schema> columnsToIndexWithoutRequiredMetas =
getColumnsToIndexWithoutRequiredMetaFields(metadataConfig, tableSchemaLazyOpt,
isTableInitializing, recordType);
if (!tableConfig.populateMetaFields()) {
- return columnsToIndexWithoutRequiredMetas.collect(Collectors.toList());
+ return columnsToIndexWithoutRequiredMetas;
}
- return Stream.concat(Arrays.stream(META_COLS_TO_ALWAYS_INDEX),
columnsToIndexWithoutRequiredMetas).collect(Collectors.toList());
+ Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
+ colsToIndexSchemaMap.putAll(META_COLS_TO_ALWAYS_INDEX_SCHEMA_MAP);
+ colsToIndexSchemaMap.putAll(columnsToIndexWithoutRequiredMetas);
+ return colsToIndexSchemaMap;
}
/**
@@ -1555,7 +1566,7 @@ public class HoodieTableMetadataUtil {
* @param recordType Option of record type. Used to determine
which types are valid to index
* @return list of columns that should be indexed
*/
- private static Stream<String>
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
+ private static Map<String, Schema>
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
Lazy<Option<Schema>> tableSchemaLazyOpt,
boolean isTableInitializing,
Option<HoodieRecordType> recordType) {
@@ -1563,41 +1574,41 @@ public class HoodieTableMetadataUtil {
if (!columnsToIndex.isEmpty()) {
// if explicitly overriden
if (isTableInitializing) {
- return columnsToIndex.stream();
+ Map<String, Schema> toReturn = new LinkedHashMap<>();
+ columnsToIndex.stream().forEach(colName -> toReturn.put(colName,
null));
+ return toReturn;
}
ValidationUtils.checkArgument(tableSchemaLazyOpt.get().isPresent(),
"Table schema not found for the table while computing col stats");
// filter for eligible fields
Option<Schema> tableSchema = tableSchemaLazyOpt.get();
- return columnsToIndex.stream().filter(fieldName ->
!META_COL_SET_TO_INDEX.contains(fieldName))
- .filter(fieldName -> {
- if (tableSchema.isPresent()) {
- return
isColumnTypeSupported(HoodieAvroUtils.getSchemaForField(tableSchema.get(),
fieldName).getValue().schema(), recordType);
- } else {
- if (isTableInitializing) {
- return true;
- } else {
- throw new HoodieIOException("Table schema not found to find
eligible cols to index");
- }
- }
- });
+ Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
+ columnsToIndex.stream().filter(fieldName ->
!META_COL_SET_TO_INDEX.contains(fieldName))
+ .map(colName -> Pair.of(colName,
HoodieAvroUtils.getSchemaForField(tableSchema.get(),
colName).getRight().schema()))
+ .filter(fieldNameSchemaPair -> {
+ return isColumnTypeSupported(fieldNameSchemaPair.getValue(),
recordType);
+ }).forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(),
entry.getValue()));
+ return colsToIndexSchemaMap;
}
// if not overridden
if (tableSchemaLazyOpt.get().isPresent()) {
- return tableSchemaLazyOpt.get().map(schema ->
getFirstNSupportedFieldNames(schema,
metadataConfig.maxColumnsToIndexForColStats(),
recordType)).orElse(Stream.empty());
+ Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
+ tableSchemaLazyOpt.get().map(schema -> getFirstNSupportedFields(schema,
metadataConfig.maxColumnsToIndexForColStats(),
recordType)).orElse(Stream.empty())
+ .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(),
entry.getValue()));
+ return colsToIndexSchemaMap;
} else if (isTableInitializing) {
- return Stream.empty();
+ return Collections.emptyMap();
} else {
throw new HoodieMetadataException("Cannot initialize col stats with
empty list of cols");
}
}
- private static Stream<String> getFirstNSupportedFieldNames(Schema
tableSchema, int n, Option<HoodieRecordType> recordType) {
- return getFirstNFieldNames(tableSchema.getFields().stream()
- .filter(field -> isColumnTypeSupported(field.schema(),
recordType)).map(Schema.Field::name), n);
+ private static Stream<Pair<String, Schema>> getFirstNSupportedFields(Schema
tableSchema, int n, Option<HoodieRecordType> recordType) {
+ return getFirstNFields(tableSchema.getFields().stream()
+ .filter(field -> isColumnTypeSupported(field.schema(),
recordType)).map(field -> Pair.of(field.name(), field.schema())), n);
}
- private static Stream<String> getFirstNFieldNames(Stream<String> fieldNames,
int n) {
- return fieldNames.filter(fieldName ->
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName)).limit(n);
+ private static Stream<Pair<String, Schema>>
getFirstNFields(Stream<Pair<String, Schema>> fieldSchemaPairStream, int n) {
+ return fieldSchemaPairStream.filter(fieldSchemaPair ->
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldSchemaPair.getKey())).limit(n);
}
private static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
@@ -1755,7 +1766,7 @@ public class HoodieTableMetadataUtil {
* NOTE: This method has to stay compatible with the semantic of
* {@link FileFormatUtils#readColumnStatsFromMetadata} as they are used
in tandem
*/
- private static Comparable<?> coerceToComparable(Schema schema, Object val) {
+ public static Comparable<?> coerceToComparable(Schema schema, Object val) {
if (val == null) {
return null;
}
@@ -1780,7 +1791,7 @@ public class HoodieTableMetadataUtil {
// depending on the Avro version. Hence, we simply cast it to
{@code Comparable<?>}
return (Comparable<?>) val;
}
- return (Integer) val;
+ return castToInteger(val);
case LONG:
if (schema.getLogicalType() == LogicalTypes.timeMicros()
@@ -1790,17 +1801,18 @@ public class HoodieTableMetadataUtil {
// depending on the Avro version. Hence, we simply cast it to
{@code Comparable<?>}
return (Comparable<?>) val;
}
- return (Long) val;
+ return castToLong(val);
case STRING:
// unpack the avro Utf8 if possible
return val.toString();
case FLOAT:
+ return castToFloat(val);
case DOUBLE:
+ return castToDouble((val));
case BOOLEAN:
return (Comparable<?>) val;
-
// TODO add support for those types
case ENUM:
case MAP:
@@ -1814,16 +1826,97 @@ public class HoodieTableMetadataUtil {
}
}
- private static boolean isColumnTypeSupported(Schema schema,
Option<HoodieRecordType> recordType) {
+ private static Integer castToInteger(Object val) {
+ if (val == null) {
+ return null;
+ }
+ if (val instanceof Integer) {
+ return (Integer) val;
+ } else if (val instanceof Long) {
+ return ((Long) val).intValue();
+ } else if (val instanceof Float) {
+ return ((Float)val).intValue();
+ } else if (val instanceof Double) {
+ return ((Double)val).intValue();
+ } else if (val instanceof Boolean) {
+ return ((Boolean) val) ? 1 : 0;
+ } else {
+ // best effort casting
+ return Integer.parseInt(val.toString());
+ }
+ }
+
+ private static Long castToLong(Object val) {
+ if (val == null) {
+ return null;
+ }
+ if (val instanceof Integer) {
+ return ((Integer) val).longValue();
+ } else if (val instanceof Long) {
+ return ((Long) val);
+ } else if (val instanceof Float) {
+ return ((Float)val).longValue();
+ } else if (val instanceof Double) {
+ return ((Double)val).longValue();
+ } else if (val instanceof Boolean) {
+ return ((Boolean) val) ? 1L : 0L;
+ } else {
+ // best effort casting
+ return Long.parseLong(val.toString());
+ }
+ }
+
+ private static Float castToFloat(Object val) {
+ if (val == null) {
+ return null;
+ }
+ if (val instanceof Integer) {
+ return ((Integer) val).floatValue();
+ } else if (val instanceof Long) {
+ return ((Long) val).floatValue();
+ } else if (val instanceof Float) {
+ return ((Float)val).floatValue();
+ } else if (val instanceof Double) {
+ return ((Double)val).floatValue();
+ } else if (val instanceof Boolean) {
+ return (Boolean) val ? 1.0f : 0.0f;
+ } else {
+ // best effort casting
+ return Float.parseFloat(val.toString());
+ }
+ }
+
+ private static Double castToDouble(Object val) {
+ if (val == null) {
+ return null;
+ }
+ if (val instanceof Integer) {
+ return ((Integer) val).doubleValue();
+ } else if (val instanceof Long) {
+ return ((Long) val).doubleValue();
+ } else if (val instanceof Float) {
+ return ((Float)val).doubleValue();
+ } else if (val instanceof Double) {
+ return ((Double)val).doubleValue();
+ } else if (val instanceof Boolean) {
+ return (Boolean) val ? 1.0d : 0.0d;
+ } else {
+ // best effort casting
+ return Double.parseDouble(val.toString());
+ }
+ }
+
+ public static boolean isColumnTypeSupported(Schema schema,
Option<HoodieRecordType> recordType) {
+ Schema schemaToCheck = resolveNullableSchema(schema);
// if record type is set and if its AVRO, MAP, ARRAY, RECORD and ENUM
types are unsupported.
if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
- return (schema.getType() != Schema.Type.RECORD && schema.getType() !=
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
- && schema.getType() != Schema.Type.ENUM);
+ return (schemaToCheck.getType() != Schema.Type.RECORD &&
schemaToCheck.getType() != Schema.Type.ARRAY && schemaToCheck.getType() !=
Schema.Type.MAP
+ && schemaToCheck.getType() != Schema.Type.ENUM);
}
// if record Type is not set or if recordType is SPARK then we cannot
support AVRO, MAP, ARRAY, RECORD, ENUM and FIXED and BYTES type as well.
// HUDI-8585 will add support for BYTES and FIXED
- return schema.getType() != Schema.Type.RECORD && schema.getType() !=
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
- && schema.getType() != Schema.Type.ENUM && schema.getType() !=
Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED;
+ return schemaToCheck.getType() != Schema.Type.RECORD &&
schemaToCheck.getType() != Schema.Type.ARRAY && schemaToCheck.getType() !=
Schema.Type.MAP
+ && schemaToCheck.getType() != Schema.Type.ENUM &&
schemaToCheck.getType() != Schema.Type.BYTES && schemaToCheck.getType() !=
Schema.Type.FIXED;
}
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig
tableConfig) {
@@ -2379,28 +2472,34 @@ public class HoodieTableMetadataUtil {
private static Stream<HoodieRecord> collectAndProcessColumnMetadata(
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata,
- String partitionPath, boolean isTightBound) {
- return collectAndProcessColumnMetadata(partitionPath, isTightBound,
Option.empty(), fileColumnMetadata.stream().flatMap(List::stream));
+ String partitionPath, boolean isTightBound,
+ Map<String, Schema> colsToIndexSchemaMap
+ ) {
+ return collectAndProcessColumnMetadata(partitionPath, isTightBound,
Option.empty(), fileColumnMetadata.stream().flatMap(List::stream),
colsToIndexSchemaMap);
}
private static Stream<HoodieRecord>
collectAndProcessColumnMetadata(Iterable<HoodieColumnRangeMetadata<Comparable>>
fileColumnMetadataIterable, String partitionPath,
- boolean
isTightBound, Option<String> indexPartitionOpt) {
+ boolean
isTightBound, Option<String> indexPartitionOpt,
+
Map<String, Schema> colsToIndexSchemaMap
+ ) {
List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = new
ArrayList<>();
fileColumnMetadataIterable.forEach(fileColumnMetadata::add);
// Group by Column Name
- return collectAndProcessColumnMetadata(partitionPath, isTightBound,
indexPartitionOpt, fileColumnMetadata.stream());
+ return collectAndProcessColumnMetadata(partitionPath, isTightBound,
indexPartitionOpt, fileColumnMetadata.stream(), colsToIndexSchemaMap);
}
private static Stream<HoodieRecord> collectAndProcessColumnMetadata(String
partitionPath, boolean isTightBound, Option<String> indexPartitionOpt,
-
Stream<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata) {
+
Stream<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata,
+
Map<String, Schema> colsToIndexSchemaMap
+ ) {
// Group by Column Name
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnMetadataMap
=
fileColumnMetadata.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
Collectors.toList()));
// Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>> partitionStatsRangeMetadata
= columnMetadataMap.entrySet().stream()
- .map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath,
entry.getValue()));
+ .map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath,
entry.getValue(), colsToIndexSchemaMap));
// Create Partition Stats Records
return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound,
indexPartitionOpt);
@@ -2436,17 +2535,13 @@ public class HoodieTableMetadataUtil {
return engineContext.emptyHoodieData();
}
Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ?
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() ->
tryResolveSchemaForTable(dataTableMetaClient));
- final List<String> columnsToIndex =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
lazyWriterSchemaOpt,
+ final Map<String, Schema> columnsToIndexSchemaMap =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
lazyWriterSchemaOpt,
dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(),
recordTypeOpt);
- if (columnsToIndex.isEmpty()) {
+ if (columnsToIndexSchemaMap.isEmpty()) {
LOG.warn("No columns to index for partition stats index");
return engineContext.emptyHoodieData();
}
- // filter columns with only supported types
- final List<String> validColumnsToIndex = columnsToIndex.stream()
- .filter(col -> SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) ||
validateDataTypeForPartitionStats(col, lazyWriterSchemaOpt.get().get()))
- .collect(Collectors.toList());
- LOG.debug("Indexing following columns for partition stats index: {}",
validColumnsToIndex);
+ LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndexSchemaMap);
// Group by partition path and collect file names (BaseFile and LogFiles)
List<Pair<String, Set<String>>> partitionToFileNames =
partitionInfoList.stream()
@@ -2463,11 +2558,11 @@ public class HoodieTableMetadataUtil {
final String partitionPath = partitionInfo.getKey();
// Step 1: Collect Column Metadata for Each File
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getValue().stream()
- .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName,
dataTableMetaClient, validColumnsToIndex, false,
+ .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName,
dataTableMetaClient, new ArrayList<>(columnsToIndexSchemaMap.keySet()), false,
metadataConfig.getMaxReaderBufferSize()))
.collect(Collectors.toList());
- return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionPath, true).iterator();
+ return collectAndProcessColumnMetadata(fileColumnMetadata,
partitionPath, true, columnsToIndexSchemaMap).iterator();
});
}
@@ -2494,14 +2589,16 @@ public class HoodieTableMetadataUtil {
}
private static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(HoodiePairData<String,
List<HoodieColumnRangeMetadata<Comparable>>> columnRangeMetadataPartitionPair,
-
HoodieTableMetaClient dataMetaClient) {
+
HoodieTableMetaClient dataMetaClient,
+
Map<String, Schema> colsToIndexSchemaMap
+ ) {
try {
return columnRangeMetadataPartitionPair
.flatMapValues(List::iterator)
.groupByKey()
.map(pair -> {
final String partitionName = pair.getLeft();
- return collectAndProcessColumnMetadata(pair.getRight(),
partitionName, isShouldScanColStatsForTightBound(dataMetaClient),
Option.empty());
+ return collectAndProcessColumnMetadata(pair.getRight(),
partitionName, isShouldScanColStatsForTightBound(dataMetaClient),
Option.empty(), colsToIndexSchemaMap);
})
.flatMap(recordStream -> recordStream.iterator());
} catch (Exception e) {
@@ -2531,11 +2628,12 @@ public class HoodieTableMetadataUtil {
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
- List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
writerSchemaOpt, false, recordTypeOpt);
- if (columnsToIndex.isEmpty()) {
+ Map<String, Schema> columnsToIndexSchemaMap =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
writerSchemaOpt, false, recordTypeOpt);
+ if (columnsToIndexSchemaMap.isEmpty()) {
return engineContext.emptyHoodieData();
}
- LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndex);
+ List<String> colsToIndex = new
ArrayList<>(columnsToIndexSchemaMap.keySet());
+ LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndexSchemaMap.keySet());
// Group by partitionPath and then gather write stats lists,
// where each inner list contains HoodieWriteStat objects that have the
same partitionPath.
List<List<HoodieWriteStat>> partitionedWriteStats = new
ArrayList<>(allWriteStats.stream()
@@ -2549,7 +2647,7 @@ public class HoodieTableMetadataUtil {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
// Step 1: Collect Column Metadata for Each File part of current
commit metadata
List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata =
partitionedWriteStat.stream()
- .flatMap(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex, tableSchema).stream()).collect(toList());
+ .flatMap(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, colsToIndex, tableSchema).stream()).collect(toList());
if (shouldScanColStatsForTightBound) {
checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
@@ -2563,7 +2661,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.toSet());
// Fetch metadata table COLUMN_STATS partition records for above
files
List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata
= tableMetadata
- .getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex,
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
+ .getRecordsByKeyPrefixes(generateKeyPrefixes(colsToIndex,
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
// schema and properties are ignored in getInsertValue, so
simply pass as null
.map(record ->
((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
.filter(Option::isPresent)
@@ -2580,7 +2678,7 @@ public class HoodieTableMetadataUtil {
return Pair.of(partitionName, fileColumnMetadata);
});
- return convertMetadataToPartitionStatsRecords(columnRangeMetadata,
dataMetaClient);
+ return convertMetadataToPartitionStatsRecords(columnRangeMetadata,
dataMetaClient, columnsToIndexSchemaMap);
} catch (Exception e) {
throw new HoodieException("Failed to generate column stats records for
metadata table", e);
}
@@ -2599,20 +2697,6 @@ public class HoodieTableMetadataUtil {
}
}
- /**
- * Given table schema and field to index, checks if field's data type are
supported.
- *
- * @param columnToIndex column to index
- * @param tableSchema table schema
- * @return true if field's data type is supported, false otherwise
- */
- @VisibleForTesting
- public static boolean validateDataTypeForPartitionStats(String
columnToIndex, Schema tableSchema) {
- Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema,
columnToIndex);
- // to be fixed HUDI-8680.
- return isColumnTypeSupported(fieldSchema,
Option.of(HoodieRecordType.SPARK));
- }
-
/**
* Generate key prefixes for each combination of column name in {@param
columnsToIndex} and {@param partitionName}.
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
index e73aaebc4ed..57cd85f3570 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -42,7 +43,7 @@ public class TestBaseFileUtils {
"path/to/file2", COLUMN_NAME, 3, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
+ HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges,
Collections.emptyMap());
// Step 3: Assertions
assertEquals(PARTITION_PATH, result.getFilePath());
assertEquals(COLUMN_NAME, result.getColumnName());
@@ -64,7 +65,7 @@ public class TestBaseFileUtils {
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
+ HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges,
Collections.emptyMap());
// Step 3: Assertions
assertEquals(PARTITION_PATH, result.getFilePath());
assertEquals(COLUMN_NAME, result.getColumnName());
@@ -85,6 +86,7 @@ public class TestBaseFileUtils {
"path/to/file2", "columnName2", null, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- assertThrows(IllegalArgumentException.class, () ->
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges));
+ assertThrows(IllegalArgumentException.class, () ->
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges,
+ Collections.emptyMap()));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index f7eba5e4d85..5026af91a96 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -365,7 +365,7 @@ public class ITTestSchemaEvolution {
FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false,
HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED.key(), false,
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true,
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
"false", // HUDI-8587
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
"false");
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 377b0121e71..4be1b26183d 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -69,7 +69,6 @@ import static
org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_AVRO_TYPES_ST
import static
org.apache.hudi.avro.TestHoodieAvroUtils.SCHEMA_WITH_NESTED_FIELD_STR;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.computeRevivedAndDeletedKeys;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileIDForFileGroup;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForPartitionStats;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.validateDataTypeForSecondaryOrExpressionIndex;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -389,8 +388,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
addNColumns(colNames,
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
List<String> expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
addNColumns(expected,
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue());
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
- Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with table schema < default
int tableSchemaSize =
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() - 10;
@@ -399,7 +398,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
addNColumns(colNames, tableSchemaSize);
expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
addNColumns(expected, tableSchemaSize);
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with max val < tableSchema
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -410,7 +410,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
addNColumns(colNames,
HoodieMetadataConfig.COLUMN_STATS_INDEX_MAX_COLUMNS.defaultValue() + 10);
expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
addNColumns(expected, 3);
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with max val > tableSchema
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -421,7 +422,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
addNColumns(colNames, tableSchemaSize);
expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
addNColumns(expected, tableSchemaSize);
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with list of cols and a nested field as well.
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -448,7 +450,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.noDefault()
.endRecord();
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema)), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(schema)), false).keySet()));
//test with list of cols longer than config
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -462,7 +465,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected.add("col_1");
expected.add("col_7");
expected.add("col_11");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with list of cols including meta cols than config
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -476,7 +480,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected.add("col_7");
expected.add("col_11");
expected.add("_hoodie_commit_seqno");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with avro schema
schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR);
@@ -488,7 +493,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected.add("booleanField");
expected.add("decimalField");
expected.add("localTimestampMillisField");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema)), true));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(schema)), true).keySet()));
//test with avro schema and nested fields and unsupported types
schema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR);
@@ -499,7 +505,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
expected.add("firstname");
expected.add("student.lastname");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema)), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(schema)), false).keySet()));
//test with avro schema with max cols set
schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR);
@@ -510,9 +517,11 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected = new
ArrayList<>(Arrays.asList(HoodieTableMetadataUtil.META_COLS_TO_ALWAYS_INDEX));
expected.add("booleanField");
expected.add("intField");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema)), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(schema)), false).keySet()));
//test with avro schema with meta cols
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(schema))),
false).keySet()));
//test with avro schema with type filter
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -535,10 +544,12 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected.add("current_date");
expected.add("current_ts");
expected.add("_hoodie_is_deleted");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA)), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA)),
false).keySet()));
//test with avro schema with meta cols
- assertEquals(expected,
- HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA))),
false));
+ assertListEquality(expected,
+ new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+
Lazy.eagerly(Option.of(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA))),
false).keySet()));
//test with meta cols disabled
tableConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(),
"false");
@@ -549,7 +560,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
addNColumns(colNames, tableSchemaSize);
expected = new ArrayList<>();
addNColumns(expected, tableSchemaSize);
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with meta cols disabled with col list
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -562,7 +574,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected.add("col_1");
expected.add("col_7");
expected.add("col_11");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(getTableSchema(expected))), false));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(getTableSchema(expected))), false).keySet()));
//test with meta cols disabled with avro schema
metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -573,7 +586,14 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
expected.add("booleanField");
expected.add("decimalField");
expected.add("localTimestampMillisField");
- assertEquals(expected,
HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig,
Lazy.eagerly(Option.of(schema)), true));
+ assertListEquality(expected, new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig,
metadataConfig,
+ Lazy.eagerly(Option.of(schema)), true).keySet()));
+ }
+
+ private void assertListEquality(List<String> expected, List<String> actual) {
+ Collections.sort(expected);
+ Collections.sort(actual);
+ assertEquals(expected, actual);
}
private void addNColumns(List<String> list, int n) {
@@ -611,19 +631,19 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.endRecord();
// Test for primitive fields
- assertTrue(validateDataTypeForPartitionStats("stringField", schema));
- assertTrue(validateDataTypeForPartitionStats("intField", schema));
- assertTrue(validateDataTypeForPartitionStats("booleanField", schema));
- assertTrue(validateDataTypeForPartitionStats("floatField", schema));
- assertTrue(validateDataTypeForPartitionStats("doubleField", schema));
- assertTrue(validateDataTypeForPartitionStats("longField", schema));
- assertTrue(validateDataTypeForPartitionStats("unionIntField", schema));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("stringField").schema(),
Option.empty()));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("intField").schema(),
Option.empty()));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("booleanField").schema(),
Option.empty()));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("floatField").schema(),
Option.empty()));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("doubleField").schema(),
Option.empty()));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("longField").schema(),
Option.empty()));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("unionIntField").schema(),
Option.empty()));
// Test for unsupported fields
- assertFalse(validateDataTypeForPartitionStats("arrayField", schema));
- assertFalse(validateDataTypeForPartitionStats("mapField", schema));
- assertFalse(validateDataTypeForPartitionStats("structField", schema));
- assertFalse(validateDataTypeForPartitionStats("bytesField", schema));
+
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("arrayField").schema(),
Option.empty()));
+
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("mapField").schema(),
Option.empty()));
+
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("structField").schema(),
Option.empty()));
+
assertFalse(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("bytesField").schema(),
Option.of(HoodieRecord.HoodieRecordType.SPARK)));
// Test for logical types
Schema dateFieldSchema =
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
@@ -631,7 +651,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.fields()
.name("dateField").type(dateFieldSchema).noDefault()
.endRecord();
- assertTrue(validateDataTypeForPartitionStats("dateField", schema));
+
assertTrue(HoodieTableMetadataUtil.isColumnTypeSupported(schema.getField("dateField").schema(),
Option.empty()));
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index a00a976c551..1523a838139 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -124,10 +124,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
shouldReadInMemory: Boolean,
prunedPartitions: Option[Set[String]] = None,
prunedFileNamesOpt: Option[Set[String]] = None)(block:
DataFrame => T): T = {
- /*cachedColumnStatsIndexViews.get(targetColumns) match {
+ cachedColumnStatsIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
- case None =>*/
+ case None =>
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
prunedFileNamesOpt match {
case Some(prunedFileNames) =>
val filterFunction = new
SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
@@ -166,7 +166,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
block(df)
}
}
- //}
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
index fc4e05f0eaa..0d2c5fc24bb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
@@ -52,7 +53,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -67,8 +67,10 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -78,6 +80,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_RE
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestColStatsRecordWithMetadataRecord extends
HoodieSparkClientTestHarness {
@@ -303,6 +306,151 @@ public class TestColStatsRecordWithMetadataRecord extends
HoodieSparkClientTestH
generateNColStatsEntriesAndValidateMerge((Functions.Function1<Random,
Comparable>) random -> new BigDecimal(String.format(Locale.ENGLISH, "%5f",
random.nextFloat())));
}
+ @Test
+ public void testGetColumnRangeInPartition() {
+ String relativePartitionPath = "relativePartitionPath";
+ String fileName = "file1";
+ String colName = "colA";
+ long nullCount = 10;
+ long valueCount = 100;
+ long totalSize = 10000;
+ long totalUncompressedSize = 1000;
+
+ // Integer vals
+ HoodieColumnRangeMetadata aIntegerVal =
HoodieColumnRangeMetadata.create(fileName, colName, (Integer)1, (Integer)1000,
nullCount, valueCount, totalSize, totalUncompressedSize);
+ HoodieColumnRangeMetadata bIntegerVal =
HoodieColumnRangeMetadata.create(fileName, colName, (Integer)(-1),
(Integer)10000, nullCount, valueCount, totalSize, totalUncompressedSize);
+
+ // Long vals
+ HoodieColumnRangeMetadata aLongVal =
HoodieColumnRangeMetadata.create(fileName, colName, (Long)1L, (Long)1000L,
nullCount, valueCount, totalSize, totalUncompressedSize);
+ HoodieColumnRangeMetadata bLongVal =
HoodieColumnRangeMetadata.create(fileName, colName, (Long)(-1L), (Long)10000L,
nullCount, valueCount, totalSize, totalUncompressedSize);
+
+ // Float vals
+ HoodieColumnRangeMetadata aFloatVal =
HoodieColumnRangeMetadata.create(fileName, colName, new Float(1), new
Float(1000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+ HoodieColumnRangeMetadata bFloatVal =
HoodieColumnRangeMetadata.create(fileName, colName, new Float(-1.0), new
Float(10000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+
+ // Double vals
+ HoodieColumnRangeMetadata aDoubleVal =
HoodieColumnRangeMetadata.create(fileName, colName, new Double(0.1), new
Double(1000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+ HoodieColumnRangeMetadata bDoubleVal =
HoodieColumnRangeMetadata.create(fileName, colName, new Double(-1.0), new
Double(10000.0), nullCount, valueCount, totalSize, totalUncompressedSize);
+
+ // String vals
+ HoodieColumnRangeMetadata aStringVal =
HoodieColumnRangeMetadata.create(fileName, colName, new String("1"), new
String("1000"), nullCount, valueCount, totalSize, totalUncompressedSize);
+ HoodieColumnRangeMetadata bStringVal =
HoodieColumnRangeMetadata.create(fileName, colName, new String("-1"), new
String("10000"), nullCount, valueCount, totalSize, totalUncompressedSize);
+
+ // Merging Integer and Integer.
+ HoodieColumnRangeMetadata actualColumnRange = mergeAndAssert(aIntegerVal,
bIntegerVal, relativePartitionPath, colName, nullCount, totalSize,
totalUncompressedSize,
+ Schema.Type.INT);
+ assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+ assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+ // Merging Integer and Long.
+ actualColumnRange = mergeAndAssert(aIntegerVal, bLongVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.INT);
+ assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+ assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+ // Merging Integer and Float
+ actualColumnRange = mergeAndAssert(aIntegerVal, bFloatVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.INT);
+ assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+ assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+ // Merging Integer and Double
+ actualColumnRange = mergeAndAssert(aIntegerVal, bDoubleVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.INT);
+ assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+ assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+ // Merging Integer and String
+ actualColumnRange = mergeAndAssert(aIntegerVal, bStringVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.INT);
+ assertEquals(actualColumnRange.getMinValue(), (Integer)(-1));
+ assertEquals(actualColumnRange.getMaxValue(), (Integer)(10000));
+
+ // Long and Long
+ actualColumnRange = mergeAndAssert(aLongVal, bLongVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.LONG);
+ assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+ assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+ // Merging Long and Integer
+ actualColumnRange = mergeAndAssert(aLongVal, bIntegerVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.LONG);
+ assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+ assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+ // Merging Long and Float
+ actualColumnRange = mergeAndAssert(aLongVal, bFloatVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.LONG);
+ assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+ assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+ // Merging Long and Double
+ actualColumnRange = mergeAndAssert(aLongVal, bDoubleVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.LONG);
+ assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+ assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+ // Merging Long and String
+ actualColumnRange = mergeAndAssert(aLongVal, bStringVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.LONG);
+ assertEquals(actualColumnRange.getMinValue(), (Long)(-1L));
+ assertEquals(actualColumnRange.getMaxValue(), (Long)(10000L));
+
+ // Float and Float
+ actualColumnRange = mergeAndAssert(aFloatVal, bFloatVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.FLOAT);
+ assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+ // Merging Float and Integer
+ actualColumnRange = mergeAndAssert(aFloatVal, bIntegerVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.FLOAT);
+ assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+ // Merging Float and Long.
+ actualColumnRange = mergeAndAssert(aFloatVal, bLongVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.FLOAT);
+ assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+ // Merging Float and String
+ actualColumnRange = mergeAndAssert(aFloatVal, bStringVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.FLOAT);
+ assertEquals(actualColumnRange.getMinValue(), new Float(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Float(10000));
+
+ // Double and Double
+ actualColumnRange = mergeAndAssert(aDoubleVal, bDoubleVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.DOUBLE);
+ assertEquals(actualColumnRange.getMinValue(), new Double(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Double(10000));
+
+ // Merging Double and Integer
+ actualColumnRange = mergeAndAssert(aDoubleVal, bIntegerVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.DOUBLE);
+ assertEquals(actualColumnRange.getMinValue(), new Double(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Double(10000));
+
+ // Merging Double and Long.
+ actualColumnRange = mergeAndAssert(aDoubleVal, bLongVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.DOUBLE);
+ assertEquals(actualColumnRange.getMinValue(), new Double(-1));
+ assertEquals(actualColumnRange.getMaxValue(), new Double(10000));
+
+ // Merging Double and String
+ actualColumnRange = mergeAndAssert(aDoubleVal, bStringVal,
relativePartitionPath, colName, nullCount, totalSize, totalUncompressedSize,
Schema.Type.DOUBLE);
+ assertTrue(actualColumnRange.getMinValue().compareTo(new Double(-1)) == 0);
+ assertTrue(actualColumnRange.getMaxValue().compareTo(new Double(10000)) ==
0);
+ }
+
+ private HoodieColumnRangeMetadata
mergeAndAssert(HoodieColumnRangeMetadata<Comparable> aVal,
HoodieColumnRangeMetadata<Comparable> bVal, String relativePartitionPath,
String colName, long nullCount,
+ long totalSize, long totalUncompressedSize,
Schema.Type schemaType) {
+ List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = new
ArrayList<>();
+ fileColumnRanges.add(aVal);
+ fileColumnRanges.add(bVal);
+ Map<String, Schema> colsToIndexSchemaMap = new HashMap<>();
+ colsToIndexSchemaMap.put(colName, Schema.create(schemaType));
+
+ HoodieColumnRangeMetadata actualColumnRange =
FileFormatUtils.getColumnRangeInPartition(relativePartitionPath,
fileColumnRanges, colsToIndexSchemaMap);
+
+ validateColumnRangeMetadata(actualColumnRange, relativePartitionPath,
colName, nullCount, totalSize, totalUncompressedSize);
+ return actualColumnRange;
+ }
+
+ private void validateColumnRangeMetadata(HoodieColumnRangeMetadata
actualColumnRange, String relativePartitionPath, String colName, long
nullCount, long totalSize,
+ long totalUncompressedSize) {
+ assertEquals(actualColumnRange.getFilePath(), relativePartitionPath);
+ assertEquals(actualColumnRange.getColumnName(), colName);
+ assertEquals(actualColumnRange.getNullCount(), nullCount * 2);
+ assertEquals(actualColumnRange.getTotalSize(), totalSize * 2);
+ assertEquals(actualColumnRange.getTotalUncompressedSize(),
totalUncompressedSize * 2);
+ }
+
private void
generateNColStatsEntriesAndValidateMerge(Functions.Function1<Random,
Comparable> randomValueGenFunc) {
String fileName = "abc.parquet";
String colName = "colName";
@@ -375,7 +523,7 @@ public class TestColStatsRecordWithMetadataRecord extends
HoodieSparkClientTestH
if (doCommit) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
boolean committed = client.commitStats(instant, writeStats,
Option.empty(), metaClient.getCommitActionType());
- Assertions.assertTrue(committed);
+ assertTrue(committed);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
return writeStatuses;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 21fc5e621ff..85160db1009 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -280,7 +280,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
protected def validateColumnsToIndex(metaClient: HoodieTableMetaClient,
expectedColsToIndex: Seq[String]): Unit = {
val indexDefn =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS)
- assertEquals(expectedColsToIndex, indexDefn.getSourceFields.asScala.toSeq)
+ assertEquals(expectedColsToIndex.sorted,
indexDefn.getSourceFields.asScala.toSeq.sorted)
}
protected def validateNonExistantColumnsToIndexDefn(metaClient:
HoodieTableMetaClient): Unit = {
@@ -301,9 +301,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
val localSourceTableSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
val columnStatsIndex = new ColumnStatsIndexSupport(spark,
localSourceTableSchema, metadataConfig, metaClient)
- val lazyOptTableSchema : Lazy[org.apache.hudi.common.util.Option[Schema]]
= Lazy.eagerly(org.apache.hudi.common.util.Option.of(tableSchema))
- val indexedColumnswithMeta: Set[String] = HoodieTableMetadataUtil
- .getColumnsToIndex(metaClient.getTableConfig, metadataConfig,
lazyOptTableSchema, false).asScala.toSet
+ val indexedColumnswithMeta: Set[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSet
val indexedColumns = indexedColumnswithMeta.filter(colName =>
!HoodieTableMetadataUtil.META_COL_SET_TO_INDEX.contains(colName))
val sortedIndexedColumns : Set[String] = TreeSet(indexedColumns.toSeq:_*)
val (expectedColStatsSchema, _) =
composeIndexSchema(sortedIndexedColumns.toSeq, indexedColumns.toSeq,
localSourceTableSchema)
@@ -345,10 +343,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
val schemaUtil = new TableSchemaResolver(metaClient)
val tableSchema = schemaUtil.getTableAvroSchema(false)
- val localSourceTableSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
- val lazyOptTableSchema : Lazy[org.apache.hudi.common.util.Option[Schema]]
= Lazy.eagerly(org.apache.hudi.common.util.Option.of(tableSchema))
- val indexedColumnswithMeta: Set[String] = HoodieTableMetadataUtil
- .getColumnsToIndex(metaClient.getTableConfig, metadataConfig,
lazyOptTableSchema, false).asScala.toSet
+ val indexedColumnswithMeta: Set[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSet
val pIndexedColumns = indexedColumnswithMeta.filter(colName =>
!HoodieTableMetadataUtil.META_COL_SET_TO_INDEX.contains(colName))
.toSeq.sorted
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index fc746646de3..179adc37165 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -54,7 +54,7 @@ class TestBasicSchemaEvolution extends
HoodieSparkClientTestBase with ScalaAsser
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false"
// HUDI-8587
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true"
)
val verificationCol: String = "driver"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 09bd1238f75..bb75cb168c5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -942,15 +942,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).build()
val columnStatsIndex = new ColumnStatsIndexSupport(spark,
inputDF1.schema, metadataConfig, metaClient)
- columnStatsIndex.loadTransposed(Seq("fare", "city_to_state", "rider"),
shouldReadInMemory = true) { emptyTransposedColStatsDF =>
- // fare is a nested column, so it should not have any min/max value as
it is not comparable, but still have nullCount
- assertEquals(0, emptyTransposedColStatsDF.filter("fare_minValue IS NOT
NULL").count())
- assertEquals(0, emptyTransposedColStatsDF.filter("fare_maxValue IS NOT
NULL").count())
- assertTrue(emptyTransposedColStatsDF.filter("fare_nullCount IS NOT
NULL").count() > 0)
- // city_to_state is a map column, so it should not have any min/max
value as it is not comparable, but still have nullCount
- assertEquals(0,
emptyTransposedColStatsDF.filter("city_to_state_minValue IS NOT NULL").count())
- assertEquals(0,
emptyTransposedColStatsDF.filter("city_to_state_maxValue IS NOT NULL").count())
- assertTrue(emptyTransposedColStatsDF.filter("city_to_state_nullCount
IS NOT NULL").count() > 0)
+ columnStatsIndex.loadTransposed(Seq("fare","city_to_state", "rider"),
shouldReadInMemory = true) { emptyTransposedColStatsDF =>
+ assertTrue(!emptyTransposedColStatsDF.columns.contains("fare"))
+
assertTrue(!emptyTransposedColStatsDF.columns.contains("city_to_state"))
// rider is a simple string field, so it should have a min/max value
as well as nullCount
assertTrue(emptyTransposedColStatsDF.filter("rider_minValue IS NOT
NULL").count() > 0)
assertTrue(emptyTransposedColStatsDF.filter("rider_maxValue IS NOT
NULL").count() > 0)
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 6947611e921..d8d94ec34aa 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -25,7 +25,6 @@ import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -156,7 +155,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[]
transformerClasses, boolean nullForDeletedCols,
TypedProperties
extraProps) throws IOException {
-
extraProps.setProperty(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),"false");
// HUDI-8587
extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
extraProps.setProperty("hoodie.datasource.write.row.writer.enable",
rowWriterEnable.toString());