This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new baf141abbd6 [HUDI-8681] Unifying supported cols across col stats and
partition stats index (#12638)
baf141abbd6 is described below
commit baf141abbd6da022c66fa518588e34452a6902b4
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jan 17 02:59:57 2025 -0800
[HUDI-8681] Unifying supported cols across col stats and partition stats
index (#12638)
---
.../hudi/client/HoodieColumnStatsIndexUtils.java | 4 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 18 +++++---
.../hudi/metadata/HoodieTableMetadataUtil.java | 50 +++++++++++++---------
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 6 ++-
4 files changed, 48 insertions(+), 30 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
index 6b2feff15ec..adcbd3c2e4a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -65,7 +66,8 @@ public class HoodieColumnStatsIndexUtils {
HoodieCommitMetadata.class);
if
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
{
// update data table's table config for list of columns indexed.
- List<String> columnsToIndex =
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata,
dataTable.getMetaClient(), config.getMetadataConfig());
+ List<String> columnsToIndex =
HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata,
dataTable.getMetaClient(), config.getMetadataConfig(),
+ Option.of(config.getRecordMerger().getRecordType()));
// if col stats is getting updated, lets also update list of
columns indexed if changed.
updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b21758a15df..797624f8859 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -56,6 +56,7 @@ import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -440,7 +441,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
continue;
}
- fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
+ fileGroupCountAndRecordsPair = initializePartitionStatsIndex();
partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
@@ -524,9 +525,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
- private Pair<Integer, HoodieData<HoodieRecord>>
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) throws
IOException {
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializePartitionStatsIndex() throws IOException {
HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext,
getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(),
dataMetaClient,
- Option.of(new
Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
+ Option.of(new
Schema.Parser().parse(dataWriteConfig.getWriteSchema())),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}
@@ -534,7 +535,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionToFilesMap) {
// Find the columns to index
final List<String> columnsToIndex =
HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
- dataWriteConfig.getMetadataConfig(), Lazy.lazily(() ->
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)), true);
+ dataWriteConfig.getMetadataConfig(), Either.right(Lazy.lazily(() ->
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient))), true,
+ Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
if (columnsToIndex.isEmpty()) {
@@ -1090,7 +1092,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient, getTableMetadata(),
dataWriteConfig.getMetadataConfig(),
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
- dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding(), getEngineType());
+ dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
+ Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
@@ -1111,7 +1114,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient, getTableMetadata(), dataWriteConfig.getMetadataConfig(),
- enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding(), getEngineType());
+ enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
+ Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
records.union(additionalUpdates));
updateExpressionIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
@@ -1231,7 +1235,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () ->
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
cleanMetadata, instantTime, dataMetaClient,
dataWriteConfig.getMetadataConfig(), enabledPartitionTypes,
- dataWriteConfig.getBloomIndexParallelism()));
+ dataWriteConfig.getBloomIndexParallelism(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType())));
closeInternal();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index bce5b5d7acb..827c4a0c17d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -378,7 +378,8 @@ public class HoodieTableMetadataUtil {
public static Map<String, HoodieData<HoodieRecord>>
convertMetadataToRecords(HoodieEngineContext context, HoodieConfig
hoodieConfig, HoodieCommitMetadata commitMetadata,
String instantTime, HoodieTableMetaClient dataMetaClient, HoodieTableMetadata
tableMetadata,
HoodieMetadataConfig metadataConfig, List<MetadataPartitionType>
enabledPartitionTypes, String bloomFilterType,
-
int bloomIndexParallelism, Integer writesFileIdEncoding, EngineType engineType)
{
+
int bloomIndexParallelism, int writesFileIdEncoding, EngineType engineType,
+
Option<HoodieRecordType> recordTypeOpt) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new
HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
context.parallelize(
convertMetadataToFilesPartitionRecords(commitMetadata, instantTime),
1);
@@ -392,7 +393,7 @@ public class HoodieTableMetadataUtil {
if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertMetadataToColumnStatsRecords(commitMetadata, context,
- dataMetaClient, metadataConfig);
+ dataMetaClient, metadataConfig, recordTypeOpt);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
metadataColumnStatsRDD);
}
if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS))
{
@@ -400,7 +401,7 @@ public class HoodieTableMetadataUtil {
"Column stats partition must be enabled to generate partition stats.
Please enable: " +
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
// Generate Hoodie Pair data of partition name and list of column range
metadata for all the files in that partition
final HoodieData<HoodieRecord> partitionStatsRDD =
convertMetadataToPartitionStatRecords(commitMetadata, context,
- dataMetaClient, tableMetadata, metadataConfig);
+ dataMetaClient, tableMetadata, metadataConfig, recordTypeOpt);
partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
partitionStatsRDD);
}
if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) {
@@ -575,7 +576,8 @@ public class HoodieTableMetadataUtil {
HoodieTableMetaClient dataMetaClient,
HoodieMetadataConfig metadataConfig,
List<MetadataPartitionType> enabledPartitionTypes,
-
int bloomIndexParallelism) {
+
int bloomIndexParallelism,
+
Option<HoodieRecordType> recordTypeOpt) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new
HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
engineContext.parallelize(
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
@@ -589,11 +591,12 @@ public class HoodieTableMetadataUtil {
if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
- dataMetaClient, metadataConfig);
+ dataMetaClient, metadataConfig, recordTypeOpt);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
metadataColumnStatsRDD);
}
if
(enabledPartitionTypes.contains(MetadataPartitionType.EXPRESSION_INDEX)) {
- convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata,
instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism,
partitionToRecordsMap);
+ convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata,
instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism,
partitionToRecordsMap,
+ recordTypeOpt);
}
return partitionToRecordsMap;
@@ -602,7 +605,8 @@ public class HoodieTableMetadataUtil {
private static void
convertMetadataToExpressionIndexRecords(HoodieEngineContext engineContext,
HoodieCleanMetadata cleanMetadata,
String
instantTime, HoodieTableMetaClient dataMetaClient,
HoodieMetadataConfig metadataConfig, int bloomIndexParallelism,
- Map<String,
HoodieData<HoodieRecord>> partitionToRecordsMap) {
+ Map<String,
HoodieData<HoodieRecord>> partitionToRecordsMap,
+
Option<HoodieRecordType> recordTypeOpt) {
Option<HoodieIndexMetadata> indexMetadata =
dataMetaClient.getIndexMetadata();
if (indexMetadata.isPresent()) {
HoodieIndexMetadata metadata = indexMetadata.get();
@@ -626,7 +630,7 @@ public class HoodieTableMetadataUtil {
.withColumnStatsIndexForColumns(String.join(",",
indexDefinition.getSourceFields()))
.build();
partitionToRecordsMap.put(indexName,
- convertMetadataToColumnStatsRecords(cleanMetadata,
engineContext, dataMetaClient, modifiedMetadataConfig));
+ convertMetadataToColumnStatsRecords(cleanMetadata,
engineContext, dataMetaClient, modifiedMetadataConfig, recordTypeOpt));
} else {
throw new HoodieMetadataException("Unsupported expression index
type");
}
@@ -749,7 +753,8 @@ public class HoodieTableMetadataUtil {
public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient,
-
HoodieMetadataConfig metadataConfig) {
+
HoodieMetadataConfig metadataConfig,
+
Option<HoodieRecordType> recordTypeOpt) {
List<Pair<String, String>> deleteFileList = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
// Files deleted from a partition
@@ -758,7 +763,7 @@ public class HoodieTableMetadataUtil {
});
List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
- Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)), false);
+ Either.right(Lazy.lazily(() ->
tryResolveSchemaForTable(dataMetaClient))), false, recordTypeOpt);
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
@@ -1451,7 +1456,8 @@ public class HoodieTableMetadataUtil {
public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient,
-
HoodieMetadataConfig metadataConfig) {
+
HoodieMetadataConfig metadataConfig,
+
Option<HoodieRecordType> recordTypeOpt) {
List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
@@ -1460,7 +1466,7 @@ public class HoodieTableMetadataUtil {
}
try {
- List<String> columnsToIndex = getColumnsToIndex(commitMetadata,
dataMetaClient, metadataConfig);
+ List<String> columnsToIndex = getColumnsToIndex(commitMetadata,
dataMetaClient, metadataConfig, recordTypeOpt);
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
@@ -1475,7 +1481,8 @@ public class HoodieTableMetadataUtil {
}
}
- public static List<String> getColumnsToIndex(HoodieCommitMetadata
commitMetadata, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig
metadataConfig) {
+ public static List<String> getColumnsToIndex(HoodieCommitMetadata
commitMetadata, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig
metadataConfig,
+ Option<HoodieRecordType>
recordTypeOpt) {
Option<Schema> writerSchema =
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
.flatMap(writerSchemaStr ->
@@ -1490,7 +1497,7 @@ public class HoodieTableMetadataUtil {
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
return getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
- Lazy.eagerly(tableSchema), false);
+ Either.right(Lazy.eagerly(tableSchema)), false, recordTypeOpt);
}
@VisibleForTesting
@@ -1521,7 +1528,8 @@ public class HoodieTableMetadataUtil {
return getColumnsToIndex(tableConfig, metadataConfig,
Either.right(tableSchema), isTableInitializing, Option.empty());
}
- private static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
+ @VisibleForTesting
+ public static List<String> getColumnsToIndex(HoodieTableConfig tableConfig,
HoodieMetadataConfig
metadataConfig,
Either<List<String>,
Lazy<Option<Schema>>> tableSchema,
boolean isTableInitializing,
@@ -2417,13 +2425,14 @@ public class HoodieTableMetadataUtil {
List<Pair<String, FileSlice>> partitionInfoList,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
-
Option<Schema> writerSchemaOpt) {
+
Option<Schema> writerSchemaOpt,
+
Option<HoodieRecordType> recordTypeOpt) {
if (partitionInfoList.isEmpty()) {
return engineContext.emptyHoodieData();
}
Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ?
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() ->
tryResolveSchemaForTable(dataTableMetaClient));
- final List<String> columnsToIndex =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
lazyWriterSchemaOpt,
-
dataTableMetaClient.getActiveTimeline().filterCompletedInstants().empty());
+ final List<String> columnsToIndex =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
Either.right(lazyWriterSchemaOpt),
+
dataTableMetaClient.getActiveTimeline().filterCompletedInstants().empty(),
recordTypeOpt);
if (columnsToIndex.isEmpty()) {
LOG.warn("No columns to index for partition stats index");
return engineContext.emptyHoodieData();
@@ -2496,7 +2505,8 @@ public class HoodieTableMetadataUtil {
}
public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient,
-
HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig) {
+
HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig,
+
Option<HoodieRecordType> recordTypeOpt) {
// In this function we fetch column range metadata for all new files part
of commit metadata along with all the other files
// of the affected partitions. The column range metadata is grouped by
partition name to generate HoodiePairData of partition name
// and list of column range metadata for that partition files. This pair
data is then used to generate partition stat records.
@@ -2516,7 +2526,7 @@ public class HoodieTableMetadataUtil {
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
Option<Schema> tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
Lazy<Option<Schema>> writerSchemaOpt = Lazy.eagerly(tableSchema);
- List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
writerSchemaOpt, false);
+ List<String> columnsToIndex =
getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
Either.right(writerSchemaOpt), false, recordTypeOpt);
if (columnsToIndex.isEmpty()) {
return engineContext.emptyHoodieData();
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 2136d9ef22e..e9e92eecce5 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -161,7 +161,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.withPartitionStatsIndexParallelism(1)
.build(),
metaClient,
- Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+ Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+ Option.empty());
// Validate the result.
validatePartitionStats(result, instant1, instant2);
}
@@ -275,7 +276,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.withPartitionStatsIndexParallelism(1)
.build(),
metaClient,
- Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS));
+ Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+ Option.empty());
// Validate the result.
validatePartitionStats(result, instant1, instant2, 6);
}