This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 16d686c7404 [HUDI-8342] Log warning and skip building partition stats
for non partitioned table (#12089)
16d686c7404 is described below
commit 16d686c7404176e602e2b196e85b3a36010ec33c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Oct 15 11:13:31 2024 -0700
[HUDI-8342] Log warning and skip building partition stats for non
partitioned table (#12089)
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 14 +++++++++++++
.../functional/PartitionStatsIndexTestBase.scala | 2 +-
.../hudi/functional/TestPartitionStatsIndex.scala | 24 +++++++++-------------
.../functional/TestSecondaryIndexPruning.scala | 6 ++++--
4 files changed, 29 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f340d86d102..86e6e19528f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -86,6 +87,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -387,6 +389,18 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ // validate that each index is eligible to be initialized
+ Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
+ while (iterator.hasNext()) {
+ MetadataPartitionType partitionType = iterator.next();
+ if (partitionType == PARTITION_STATS &&
!dataMetaClient.getTableConfig().isTablePartitioned()) {
+ LOG.warn("Partition stats index cannot be enabled for a
non-partitioned table. Removing from initialization list. Please disable {}",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key());
+ iterator.remove();
+ this.enabledPartitionTypes.remove(partitionType);
+ }
+ }
+
for (MetadataPartitionType partitionType : partitionsToInit) {
// Find the commit timestamp to use for this partition. Each
initialization should use its own unique commit time.
String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
index 08d8c68b0d0..b55fa124180 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
@@ -235,7 +235,7 @@ class PartitionStatsIndexTestBase extends
HoodieSparkClientTestBase {
}
}
- private def getInstantTime: String = {
+ protected def getInstantTime: String = {
String.format("%03d", new Integer(instantTime.incrementAndGet()))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 25f3258d8a5..6a52feba966 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
@@ -87,20 +87,16 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
}
/**
- * Test case to do a write with updates for non-partitioned table and
validate the partition stats index.
+ * Test case to write with updates for non-partitioned table and validate
the partition stats index is not created.
*/
- @ParameterizedTest
- @EnumSource(classOf[HoodieTableType])
- def testIndexWithUpsertNonPartitioned(tableType: HoodieTableType): Unit = {
- val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key +
(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
- doWriteAndValidateDataAndPartitionStats(
- hudiOpts,
- operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Overwrite)
- doWriteAndValidateDataAndPartitionStats(
- hudiOpts,
- operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ @Test
+ def testIndexWithUpsertNonPartitioned(): Unit = {
+ val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key +
(DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL)
+ doWriteAndValidateDataAndPartitionStats(hudiOpts, operation =
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite,
false)
+ doWriteAndValidateDataAndPartitionStats(hudiOpts, operation =
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append,
false)
+ // there should not be any partition stats
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 395da41dff1..a8f55c514b7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -345,6 +345,8 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
val sqlTableType = if
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
tableName += "test_secondary_index_with_partition_stats_index" + (if
(isPartitioned) "_partitioned" else "") + sqlTableType
val partitionedByClause = if (isPartitioned) "partitioned
by(partition_key_col)" else ""
+ val partitionStatsEnable = if (isPartitioned)
"'hoodie.metadata.index.partition.stats.enable' = 'true'," else ""
+ val columnsToIndex = if (isPartitioned)
"'hoodie.metadata.index.column.stats.column.list' = 'name'," else ""
spark.sql(
s"""
@@ -361,8 +363,8 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
- | 'hoodie.metadata.index.partition.stats.enable' = 'true',
- | 'hoodie.metadata.index.column.stats.column.list' = 'name',
+ | $partitionStatsEnable
+ | $columnsToIndex
| hoodie.enable.data.skipping = 'true'
| )
| $partitionedByClause