This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2187205b280606b2019139185be669431d40e5d1 Author: Lin Liu <[email protected]> AuthorDate: Fri Apr 4 07:23:59 2025 -0700 [HUDI-9258] Disable partial update when global index is used (#13086) * Disable global index for merge into query * Fix import styles * Add flag and test * Fix schema usage * Fix compilation * remove custom merge mode validation --------- Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Sagar Sumit <[email protected]> (cherry picked from commit 2e5ccb42ad503b36623461cf4ccc987f1ccfc877) --- .../org/apache/hudi/io/HoodieMergedReadHandle.java | 2 +- .../hudi/command/MergeIntoHoodieTableCommand.scala | 30 +++++++- .../analysis/TestMergeIntoHoodieTableCommand.scala | 81 ++++++++++++++++++++++ .../hudi/dml/TestPartialUpdateForMergeInto.scala | 81 ++++++++++++++++++++-- 4 files changed, 184 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java index 8195e248e13..ca7e5d7d428 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java @@ -68,7 +68,7 @@ public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K Pair<String, String> partitionPathFileIDPair, Option<FileSlice> fileSliceOption) { super(config, instantTime, hoodieTable, partitionPathFileIDPair); - readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); + readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 31ba77fafcb..2551f7cbcf9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -24,13 +24,14 @@ import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_SCHEMA import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.RecordMergeMode import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecordMerger} -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion} import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys import org.apache.hudi.common.util.StringUtils -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieHBaseIndexConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA} import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException} import org.apache.hudi.hive.HiveSyncConfigHolder +import org.apache.hudi.index.HoodieIndex import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.util.JFunction.scalaFunction1Noop @@ -491,7 +492,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie && parameters.getOrElse( ENABLE_MERGE_INTO_PARTIAL_UPDATES.key, ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean - && updatingActions.nonEmpty) + && updatingActions.nonEmpty + && (parameters.getOrElse(HoodieWriteConfig.WRITE_TABLE_VERSION.key, HoodieTableVersion.current().versionCode().toString).toInt + >= HoodieTableVersion.EIGHT.versionCode()) + && !useGlobalIndex(parameters)) } private def getOperationType(parameters: Map[String, String]) = { @@ -1057,6 +1061,26 @@ object MergeIntoHoodieTableCommand { ) } } + + // Check if global index, e.g., GLOBAL_BLOOM, is set. + def useGlobalIndex(parameters: Map[String, String]): Boolean = { + parameters.get(HoodieIndexConfig.INDEX_TYPE.key).exists { indexType => + isGlobalIndexEnabled(indexType, parameters) + } + } + + // Check if goal index is enabled for specific indexes. + def isGlobalIndexEnabled(indexType: String, parameters: Map[String, String]): Boolean = { + Seq( + HoodieIndex.IndexType.GLOBAL_SIMPLE -> HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.GLOBAL_BLOOM -> HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.RECORD_INDEX -> HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.HBASE -> HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE + ).collectFirst { + case (hoodieIndex, config) if indexType == hoodieIndex.name => + parameters.getOrElse(config.key, config.defaultValue().toString).toBoolean + }.getOrElse(false) + } } object PartialAssignmentMode extends Enumeration { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala new file mode 100644 index 00000000000..9f55f449452 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.analysis + +import org.apache.hudi.config.{HoodieHBaseIndexConfig, HoodieIndexConfig} +import org.apache.hudi.index.HoodieIndex + +import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class TestMergeIntoHoodieTableCommand extends AnyFlatSpec with Matchers { + "useGlobalIndex" should "return true when global index is enabled" in { + val globalIndices = Seq( + HoodieIndex.IndexType.GLOBAL_SIMPLE -> HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.GLOBAL_BLOOM -> HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.RECORD_INDEX -> HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.HBASE -> HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE + ) + + globalIndices.foreach { case (indexType, config) => + val parameters = Map( + HoodieIndexConfig.INDEX_TYPE.key -> indexType.name, + config.key -> "true" + ) + MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(true) + } + } + + it should "return false when update partition path is disabled" in { + val globalIndices = Seq( + HoodieIndex.IndexType.GLOBAL_SIMPLE -> HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.GLOBAL_BLOOM -> HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.RECORD_INDEX -> HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE, + HoodieIndex.IndexType.HBASE -> HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE + ) + + globalIndices.foreach { case (indexType, config) => + val parameters = Map( + HoodieIndexConfig.INDEX_TYPE.key -> indexType.name, + config.key -> "false" + ) + MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false) + } + } + + it should "return false when index type is absent" in { + val parameters = Map.empty[String, String] + MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false) + } + + it should "return false when index type is not global" in { + val nonGlobalIndices = Seq( + HoodieIndex.IndexType.SIMPLE.name, + HoodieIndex.IndexType.INMEMORY.name, + HoodieIndex.IndexType.BLOOM.name, + HoodieIndex.IndexType.BUCKET.name, + HoodieIndex.IndexType.FLINK_STATE.name) + nonGlobalIndices.foreach { indexType => + val parameters = Map(HoodieIndexConfig.INDEX_TYPE.key -> indexType) + MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index c12f78f27d3..70c81dd2a26 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -19,24 +19,21 @@ package org.apache.spark.sql.hudi.dml import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieReaderConfig, HoodieStorageConfig} -import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion, TableSchemaResolver} import org.apache.hudi.common.table.log.HoodieLogFileReader import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType import org.apache.hudi.common.table.timeline.HoodieTimeline -import org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewStorageConfig, SyncableFileSystemView} import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.CompactionUtils import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieNotSupportedException -import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.avro.Schema import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getMetaClientAndFileSystemView -import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} +import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import java.util.{Collections, List, Optional} @@ -525,6 +522,78 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { |)""".stripMargin) } + test("Partial updates for table version 6 and 8 handled gracefully in MIT") { + Seq(HoodieTableVersion.SIX.versionCode(), HoodieTableVersion.EIGHT.versionCode()).foreach( + tableVersion => withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + | CREATE TABLE $tableName ( + | record_key STRING, + | name STRING, + | age INT, + | department STRING, + | salary DOUBLE, + | ts BIGINT + | ) USING hudi + | PARTITIONED BY (department) + | LOCATION '${tmp.getCanonicalPath}' + | TBLPROPERTIES ( + | type = 'mor', + | hoodie.write.table.version = '$tableVersion', + | hoodie.index.type = 'GLOBAL_SIMPLE', + | hoodie.index.global.index.enable = 'true', + | hoodie.bloom.index.use.metadata = 'true', + | primaryKey = 'record_key', + | preCombineField = 'ts')""".stripMargin) + + spark.sql( + s""" + | INSERT INTO $tableName + | SELECT * FROM ( + | SELECT 'emp_001' as record_key, 'John Doe' as name, 30 as age, + | 'Sales' as department, 80000.0 as salary, 1598886000 as ts + | UNION ALL + | SELECT 'emp_002', 'Jane Smith', 28, 'Sales', 75000.0, 1598886001 + | UNION ALL + | SELECT 'emp_003', 'Bob Wilson', 35, 'Marketing', 85000.0, 1598886002 + |)""".stripMargin) + + spark.sql( + s""" + | UPDATE $tableName + | SET + | ts = 1598000000 + | WHERE record_key = 'emp_001' + |""".stripMargin) + + spark.sql( + s""" + | CREATE OR REPLACE TEMPORARY VIEW source_updates AS + | SELECT * FROM ( + | SELECT 'emp_001' as record_key, 'John Doe' as name, 30 as age, + | 'Engineering' as department, CAST(95000.0 as DOUBLE) as salary, cast(1598886200 as BIGINT) as ts + | UNION ALL + | SELECT 'emp_004', 'Alice Brown', 29, 'Engineering', CAST(82000.0 as DOUBLE), cast(1598886201 as BIGINT) + |)""".stripMargin) + + spark.sql( + s""" + | MERGE INTO $tableName t + | USING source_updates s + | ON t.record_key = s.record_key + | WHEN MATCHED THEN + | UPDATE SET + | record_key = s.record_key, + | department = s.department, + | salary = s.salary, + | ts = s.ts + | WHEN NOT MATCHED THEN + | INSERT *""".stripMargin) + } + ) + } + def validateLogBlock(basePath: String, expectedNumLogFile: Int, changedFields: Seq[Seq[String]],
