This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2187205b280606b2019139185be669431d40e5d1
Author: Lin Liu <[email protected]>
AuthorDate: Fri Apr 4 07:23:59 2025 -0700

    [HUDI-9258] Disable partial update when global index is used (#13086)
    
    * Disable global index for merge into query
    
    * Fix import styles
    
    * Add flag and test
    
    * Fix schema usage
    
    * Fix compilation
    
    * remove custom merge mode validation
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
    (cherry picked from commit 2e5ccb42ad503b36623461cf4ccc987f1ccfc877)
---
 .../org/apache/hudi/io/HoodieMergedReadHandle.java |  2 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala | 30 +++++++-
 .../analysis/TestMergeIntoHoodieTableCommand.scala | 81 ++++++++++++++++++++++
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   | 81 ++++++++++++++++++++--
 4 files changed, 184 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index 8195e248e13..ca7e5d7d428 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -68,7 +68,7 @@ public class HoodieMergedReadHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K
                                 Pair<String, String> partitionPathFileIDPair,
                                 Option<FileSlice> fileSliceOption) {
     super(config, instantTime, hoodieTable, partitionPathFileIDPair);
-    readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
+    readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
     // config.getSchema is not canonicalized, while config.getWriteSchema is 
canonicalized. So, we have to use the canonicalized schema to read the existing 
data.
     baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
     fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : 
getLatestFileSlice();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 31ba77fafcb..2551f7cbcf9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -24,13 +24,14 @@ import 
org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_SCHEMA
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.RecordMergeMode
 import org.apache.hudi.common.model.{HoodieAvroRecordMerger, 
HoodieRecordMerger}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
 import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
 import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieHBaseIndexConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
 import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
 import org.apache.hudi.hive.HiveSyncConfigHolder
+import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.JFunction.scalaFunction1Noop
 
@@ -491,7 +492,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       && parameters.getOrElse(
       ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
       ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
-      && updatingActions.nonEmpty)
+      && updatingActions.nonEmpty
+      && (parameters.getOrElse(HoodieWriteConfig.WRITE_TABLE_VERSION.key, 
HoodieTableVersion.current().versionCode().toString).toInt
+      >= HoodieTableVersion.EIGHT.versionCode())
+      && !useGlobalIndex(parameters))
   }
 
   private def getOperationType(parameters: Map[String, String]) = {
@@ -1057,6 +1061,26 @@ object MergeIntoHoodieTableCommand {
       )
     }
   }
+
+  // Check if global index, e.g., GLOBAL_BLOOM, is set.
+  def useGlobalIndex(parameters: Map[String, String]): Boolean = {
+    parameters.get(HoodieIndexConfig.INDEX_TYPE.key).exists { indexType =>
+      isGlobalIndexEnabled(indexType, parameters)
+    }
+  }
+
+  // Check if goal index is enabled for specific indexes.
+  def isGlobalIndexEnabled(indexType: String, parameters: Map[String, 
String]): Boolean = {
+    Seq(
+      HoodieIndex.IndexType.GLOBAL_SIMPLE -> 
HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.GLOBAL_BLOOM -> 
HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.RECORD_INDEX -> 
HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.HBASE -> 
HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE
+    ).collectFirst {
+      case (hoodieIndex, config) if indexType == hoodieIndex.name =>
+        parameters.getOrElse(config.key, 
config.defaultValue().toString).toBoolean
+    }.getOrElse(false)
+  }
 }
 
 object PartialAssignmentMode extends Enumeration {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
new file mode 100644
index 00000000000..9f55f449452
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.config.{HoodieHBaseIndexConfig, HoodieIndexConfig}
+import org.apache.hudi.index.HoodieIndex
+
+import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class TestMergeIntoHoodieTableCommand extends AnyFlatSpec with Matchers {
+  "useGlobalIndex" should "return true when global index is enabled" in {
+    val globalIndices = Seq(
+      HoodieIndex.IndexType.GLOBAL_SIMPLE -> 
HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.GLOBAL_BLOOM -> 
HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.RECORD_INDEX -> 
HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.HBASE -> 
HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE
+    )
+
+    globalIndices.foreach { case (indexType, config) =>
+      val parameters = Map(
+        HoodieIndexConfig.INDEX_TYPE.key -> indexType.name,
+        config.key -> "true"
+      )
+      MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(true)
+    }
+  }
+
+  it should "return false when update partition path is disabled" in {
+    val globalIndices = Seq(
+      HoodieIndex.IndexType.GLOBAL_SIMPLE -> 
HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.GLOBAL_BLOOM -> 
HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.RECORD_INDEX -> 
HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE,
+      HoodieIndex.IndexType.HBASE -> 
HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE
+    )
+
+    globalIndices.foreach { case (indexType, config) =>
+      val parameters = Map(
+        HoodieIndexConfig.INDEX_TYPE.key -> indexType.name,
+        config.key -> "false"
+      )
+      MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false)
+    }
+  }
+
+  it should "return false when index type is absent" in {
+    val parameters = Map.empty[String, String]
+    MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false)
+  }
+
+  it should "return false when index type is not global" in {
+    val nonGlobalIndices = Seq(
+      HoodieIndex.IndexType.SIMPLE.name,
+      HoodieIndex.IndexType.INMEMORY.name,
+      HoodieIndex.IndexType.BLOOM.name,
+      HoodieIndex.IndexType.BUCKET.name,
+      HoodieIndex.IndexType.FLINK_STATE.name)
+    nonGlobalIndices.foreach { indexType =>
+      val parameters = Map(HoodieIndexConfig.INDEX_TYPE.key -> indexType)
+      MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false)
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index c12f78f27d3..70c81dd2a26 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -19,24 +19,21 @@ package org.apache.spark.sql.hudi.dml
 
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieReaderConfig, HoodieStorageConfig}
-import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.hudi.common.table.log.HoodieLogFileReader
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
 import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.CompactionUtils
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieNotSupportedException
-import org.apache.hudi.metadata.HoodieTableMetadata
 
 import org.apache.avro.Schema
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getMetaClientAndFileSystemView
-import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 
 import java.util.{Collections, List, Optional}
@@ -525,6 +522,78 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
          |)""".stripMargin)
   }
 
+  test("Partial updates for table version 6 and 8 handled gracefully in MIT") {
+    Seq(HoodieTableVersion.SIX.versionCode(), 
HoodieTableVersion.EIGHT.versionCode()).foreach(
+      tableVersion => withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             | CREATE TABLE $tableName (
+             |   record_key STRING,
+             |   name STRING,
+             |   age INT,
+             |   department STRING,
+             |   salary DOUBLE,
+             |   ts BIGINT
+             | ) USING hudi
+             | PARTITIONED BY (department)
+             | LOCATION '${tmp.getCanonicalPath}'
+             | TBLPROPERTIES (
+             |   type = 'mor',
+             |   hoodie.write.table.version = '$tableVersion',
+             |   hoodie.index.type = 'GLOBAL_SIMPLE',
+             |   hoodie.index.global.index.enable = 'true',
+             |   hoodie.bloom.index.use.metadata = 'true',
+             |   primaryKey = 'record_key',
+             |   preCombineField = 'ts')""".stripMargin)
+
+        spark.sql(
+          s"""
+             | INSERT INTO $tableName
+             | SELECT * FROM (
+             |   SELECT 'emp_001' as record_key, 'John Doe' as name, 30 as age,
+             |          'Sales' as department, 80000.0 as salary, 1598886000 
as ts
+             |   UNION ALL
+             |   SELECT 'emp_002', 'Jane Smith', 28, 'Sales', 75000.0, 
1598886001
+             |   UNION ALL
+             |   SELECT 'emp_003', 'Bob Wilson', 35, 'Marketing', 85000.0, 
1598886002
+             |)""".stripMargin)
+
+        spark.sql(
+          s"""
+             | UPDATE $tableName
+             | SET
+             |     ts = 1598000000
+             | WHERE record_key = 'emp_001'
+             |""".stripMargin)
+
+        spark.sql(
+          s"""
+             | CREATE OR REPLACE TEMPORARY VIEW source_updates AS
+             | SELECT * FROM (
+             |   SELECT 'emp_001' as record_key, 'John Doe' as name, 30 as age,
+             |          'Engineering' as department, CAST(95000.0 as DOUBLE) 
as salary, cast(1598886200 as BIGINT) as ts
+             |   UNION ALL
+             |   SELECT 'emp_004', 'Alice Brown', 29, 'Engineering', 
CAST(82000.0 as DOUBLE), cast(1598886201 as BIGINT)
+             |)""".stripMargin)
+
+        spark.sql(
+          s"""
+             | MERGE INTO $tableName t
+             | USING source_updates s
+             | ON t.record_key = s.record_key
+             | WHEN MATCHED THEN
+             |   UPDATE SET
+             |     record_key = s.record_key,
+             |     department = s.department,
+             |     salary = s.salary,
+             |     ts = s.ts
+             | WHEN NOT MATCHED THEN
+             |   INSERT *""".stripMargin)
+      }
+    )
+  }
+
   def validateLogBlock(basePath: String,
                        expectedNumLogFile: Int,
                        changedFields: Seq[Seq[String]],

Reply via email to