Davis-Zhang-Onehouse opened a new issue, #13893:
URL: https://github.com/apache/hudi/issues/13893

   **_Tips before filing an issue_**
   
   **Describe the problem you faced**
   
   When upgrading to table version 9 from 6 with only RLI and files MDT 
partitions using a sql insert, I hit the following stack trace (check stack 
trace section)
   
   Also I didn't explicitly set hoodie.metadata.index.column.stats.enable  to 
false yet when bootstrap the MDT in version 6 does not create col stats but 
only files partition.
   
   git hash of hudi internal that reproduce the issue 
a749d6b9a557ebead5227ac9421513ac57afde57
   .hoodie folder after the issue happens
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use recent hudi master branch with git hash 
a749d6b9a557ebead5227ac9421513ac57afde57
   2. create v6 table with MDT off
   3. run bootstrap mdt, create RLI and upgrade using this code 
   ```
   import scala.io.Source
   import org.apache.hudi.benchmarks.TPCDS
   import org.apache.spark.sql.SparkSession
   import scala.util.{Try, Success, Failure}
   import java.io.{File, PrintWriter, FileWriter, BufferedWriter}
   import java.time.{LocalDateTime, Instant, ZoneOffset}
   import java.time.format.DateTimeFormatter
   import scala.collection.mutable.{ArrayBuffer, HashSet}
   import java.nio.file.{Files, Paths, StandardOpenOption}
   import org.apache.hudi.client.SparkRDDWriteClient
   import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
   import org.apache.hudi.common.table.HoodieTableMetaClient
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.common.HoodiePendingRollbackInfo
   import org.apache.hudi.common.util.{Option => HOption}
   import org.apache.hudi.HoodieCLIUtils
   import org.apache.hudi.hadoop.fs.HadoopFSUtils
   import scala.collection.JavaConverters._
   
   // ========================================
   // Procedure Definitions
   // ========================================
   object Procedure extends Enumeration {
     val ROLLBACK_PENDING_INSTANTS = Value("ROLLBACK_PENDING_INSTANTS")
     val BOOTSTRAP_METADATA = Value("BOOTSTRAP_METADATA")
     val DISABLE_METADATA = Value("DISABLE_METADATA")
     val UPGRADE_TABLE_VERSION = Value("UPGRADE_TABLE_VERSION")
     val CREATE_RECORD_INDEX = Value("CREATE_RECORD_INDEX")
     val DROP_SECONDARY_INDICES = Value("DROP_SECONDARY_INDICES")
     val CREATE_SECONDARY_INDICES = Value("CREATE_SECONDARY_INDICES")
     val VERIFY_TABLE = Value("VERIFY_TABLE")
   }
   
   val procedurePipeline = Seq(
     // First rollback any pending instants
     // (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
     //   "tableName" -> "catalog_sales"
     // )),
     (Procedure.BOOTSTRAP_METADATA, Map(
       "tableName" -> "catalog_sales"
     )),
     // (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
     //   "tableName" -> "catalog_sales"
     // )),
     (Procedure.CREATE_RECORD_INDEX, Map(
       "rliGroupCount" -> -1,
       "tableName" -> "catalog_sales",
       "recordKeys" -> Seq("cs_item_sk", "cs_order_number")
     )),
     (Procedure.UPGRADE_TABLE_VERSION, Map(
       "tableName" -> "catalog_sales"
     )),
     // (Procedure.CREATE_SECONDARY_INDICES, Map(
     //   "tableName" -> "catalog_sales",
     //   "columnsToIndex" -> Seq("cs_order_number", "cs_item_sk", 
"cs_sold_date_sk", "cs_bill_customer_sk")
     // )),
     // (Procedure.VERIFY_TABLE, Map(
     //   "tableName" -> "catalog_sales",
     //   "partitionCol" -> "cs_sold_date_sk"
     // )),
   
     // // First rollback any pending instants
     // (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
     //   "tableName" -> "store_sales"
     // )),
     // (Procedure.BOOTSTRAP_METADATA, Map(
     //   "tableName" -> "store_sales"
     // )),
     // (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
     //   "tableName" -> "store_sales"
     // )),
     // (Procedure.CREATE_RECORD_INDEX, Map(
     //   "rliGroupCount" -> -1,
     //   "tableName" -> "store_sales",
     //   "recordKeys" -> Seq("ss_item_sk", "ss_ticket_number")
     // )),
     // (Procedure.CREATE_SECONDARY_INDICES, Map(
     //   "tableName" -> "store_sales",
     //   "columnsToIndex" -> Seq("ss_store_sk", "ss_item_sk", "ss_customer_sk")
     // )),
     // (Procedure.VERIFY_TABLE, Map(
     //   "tableName" -> "store_sales",
     //   "partitionCol" -> "ss_sold_date_sk"
     // )),
   
     // // First rollback any pending instants
     // (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
     //   "tableName" -> "web_sales"
     // )),
     // (Procedure.BOOTSTRAP_METADATA, Map(
     //   "tableName" -> "web_sales"
     // )),
     // (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
     //   "tableName" -> "web_sales"
     // )),
     // (Procedure.CREATE_RECORD_INDEX, Map(
     //   "rliGroupCount" -> -1,
     //   "tableName" -> "web_sales",
     //   "recordKeys" -> Seq("ws_item_sk", "ws_order_number")
     // )),
     // (Procedure.CREATE_SECONDARY_INDICES, Map(
     //   "tableName" -> "web_sales",
     //   "columnsToIndex" -> Seq("ws_order_number")
     // )),
     // (Procedure.VERIFY_TABLE, Map(
     //   "tableName" -> "web_sales",
     //   "partitionCol" -> "ws_sold_date_sk"
     // )),
   )
   
   // ========================================
   // Utility Functions
   // ========================================
   object TableUtils {
     
     def getTablePath(spark: SparkSession, tableName: String): String = {
       val tablePathDF = spark.sql(s"DESCRIBE EXTENDED $tableName")
       val locationRow = tablePathDF.filter("col_name = 'Location'").collect()
       
       if (locationRow.isEmpty) {
         throw new Exception(s"Could not find location for table $tableName")
       }
       
       locationRow(0).getString(1)
     }
     
     def createWriteClientAndMetaClient(spark: SparkSession, tableName: 
String): (SparkRDDWriteClient[_], HoodieTableMetaClient) = {
       val tablePath = getTablePath(spark, tableName)
       
       // Create write client using HoodieCLIUtils
       val writeClient = HoodieCLIUtils.createHoodieWriteClient(
         spark,
         tablePath,
         Map.empty[String, String],
         Option(tableName)
       ).asInstanceOf[SparkRDDWriteClient[_]]
       
       // Create a fresh metaClient after write client creation
       val metaClient = HoodieTableMetaClient.reload(
         HoodieTableMetaClient.builder()
           
.setConf(org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
           .setBasePath(tablePath)
           .build()
       )
       
       (writeClient, metaClient)
     }
   }
   
   // ========================================
   // Procedure Functions
   // ========================================
   object ProcedureFunctions {
   
     def rollbackPendingInstants(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
   
       println(s"\n=== Rolling back pending instants for $tableName ===")
   
       val tablePath = TableUtils.getTablePath(spark, tableName)
       println(s"Table path: $tablePath")
   
       println("Checking for pending/inflight instants...")
   
       // Create a write client to access rollback functionality
       var writeClient: SparkRDDWriteClient[_] = null
       var metaClient: HoodieTableMetaClient = null
       try {
         val (client, meta) = TableUtils.createWriteClientAndMetaClient(spark, 
tableName)
         writeClient = client
         metaClient = meta
   
         // Use the public rollbackFailedWrites method which handles everything 
internally
         println("Attempting to rollback failed writes...")
         writeClient.rollBackFailedWrites(metaClient)
         println("✓ Rollback check completed")
   
         val rollbackPerformed = writeClient.rollbackFailedWrites(metaClient)
   
         if (rollbackPerformed) {
           println("✓ Successfully rolled back failed writes")
         } else {
           println("No failed writes found to rollback")
         }
   
         // Run clean operation after rollback using write client
         println("\n=== Running clean operation after rollback ===")
         Try {
           // Run clean using write client
           val cleanResult = writeClient.clean()
         } match {
           case Success(_) =>
           case Failure(e) => 
             println(s"Clean operation warning: ${e.getMessage}")
             e.printStackTrace()
         }
       } catch {
         case e: Exception =>
           println(s"Error during rollback: ${e.getMessage}")
           e.printStackTrace()
           // Don't rethrow as this is a best-effort operation
       } finally {
         if (writeClient != null) {
           Try(writeClient.close())
         }
       }
     }
     
     def bootstrapMetadata(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
       
       val tablePath = TableUtils.getTablePath(spark, tableName)
       val metaClient = HoodieTableMetaClient.reload(
           HoodieTableMetaClient.builder()
             
.setConf(org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
             .setBasePath(tablePath)
             .build()
         )
   
       println(s"\n=== Dummy dml to trigger MDT bootstrap $tableName ===")
       
       println(s"Running dummy INSERT to bootstrap metadata...")
       spark.sql(s"set hoodie.metadata.enable=true").show(false)
       spark.sql(s"set 
hoodie.write.table.version=${metaClient.getTableConfig().getTableVersion().versionCode()}")
         .show(false)
       
       // Get table schema to create empty row with proper types
       val tableDF = spark.table(tableName)
       val schema = tableDF.schema
       
       // Create an empty DataFrame with the same schema
       val emptyDF = 
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], 
schema)
       emptyDF.createOrReplaceTempView("empty_data")
       
       // Insert empty data (0 rows) to trigger metadata bootstrap
       spark.sql(s"INSERT INTO $tableName SELECT * FROM empty_data").show(false)
       println(s"Successfully bootstrapped metadata for $tableName")
     }
     
     def disableMetadata(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
       
       println(s"\n=== Temporarily disabling metadata for $tableName ===")
       
       // Disable metadata
       println(s"Disabling metadata...")
       spark.sql(s"set hoodie.metadata.enable=false").show(false)
       
       // Get table schema to create empty row with proper types
       val tableDF = spark.table(tableName)
       val schema = tableDF.schema
       
       // Create an empty DataFrame with the same schema
       val emptyDF = 
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], 
schema)
       emptyDF.createOrReplaceTempView("empty_disable_data")
       
       // Insert empty data (0 rows) with metadata disabled
       println(s"Running dummy INSERT with metadata disabled...")
       spark.sql(s"INSERT INTO $tableName SELECT * FROM 
empty_disable_data").show(false)
       
       // Re-enable metadata
       println(s"Re-enabling metadata...")
       spark.sql(s"set hoodie.metadata.enable=true").show(false)
       
       println(s"Successfully completed metadata disable/enable cycle for 
$tableName")
     }
     
     def upgradeTableVersion(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
   
       println(s"\n=== Upgrading $tableName to version 9 ===")
       spark.sql("set hoodie.write.table.version=9").show(false)
       spark.sql("set hoodie.write.auto.upgrade=true").show(false)
   
       println(s"Running dummy INSERT to trigger upgrade...")
       
       // Get table schema to create empty row with proper types
       val tableDF = spark.table(tableName)
       val schema = tableDF.schema
       
       // Create an empty DataFrame with the same schema
       val emptyDF = 
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], 
schema)
       emptyDF.createOrReplaceTempView("empty_upgrade_data")
       
       // Insert empty data (0 rows) to trigger table upgrade
       spark.sql(s"INSERT INTO $tableName SELECT * FROM 
empty_upgrade_data").show(false)
       println(s"Successfully triggered upgrade for $tableName")
     }
     
     def createRecordIndex(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
       val recordKeys = params("recordKeys").asInstanceOf[Seq[String]]
       val rliGroupCount = params("rliGroupCount").asInstanceOf[Int]
       
       println(s"\n=== Creating record index (RLI) for $tableName ===")
       
       val indicesDF = spark.sql(s"SHOW INDEXES FROM $tableName")
       val indices = indicesDF.collect()
       val hasRecordIndex = indices.exists(row => row.getString(0) == 
"record_index")
       
       if (!hasRecordIndex) {
         val recordKeysStr = recordKeys.mkString(", ")
         println(s"Creating record index on columns: $recordKeysStr")
         
         // Set RLI configuration parameters
         println("Setting RLI configuration parameters...")
         spark.sql("set 
hoodie.metadata.record.index.min.filegroup.count=10").show(false)
         spark.sql("set 
hoodie.metadata.record.index.max.filegroup.count=1000000").show(false)
         
         if (rliGroupCount > 0) {
           spark.sql(s"set 
hoodie.metadata.record.index.min.filegroup.count=$rliGroupCount").show(false)
           spark.sql(s"set 
hoodie.metadata.record.index.max.filegroup.count=$rliGroupCount").show(false)
         }
         
         spark.sql(s"CREATE INDEX record_index ON $tableName 
($recordKeysStr)").show(false)
         println(s"Successfully created record index for $tableName")
         
         // Reset RLI configuration parameters
         spark.sql("set 
hoodie.metadata.record.index.min.filegroup.count=10").show(false)
         spark.sql("set 
hoodie.metadata.record.index.max.filegroup.count=1000000").show(false)
       } else {
         println(s"Record index already exists for $tableName")
       }
     }
     
     def dropSecondaryIndices(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
       
       println(s"\n=== Dropping existing secondary indices for $tableName ===")
       
       val indicesDF = spark.sql(s"SHOW INDEXES FROM $tableName")
       val indices = indicesDF.collect()
       var droppedCount = 0
       
       indices.foreach { row =>
         val indexName = row.getString(0)
         // Drop indices that start with idx_ or secondary_index_
         if (indexName.startsWith("idx_") || 
indexName.startsWith("secondary_index_")) {
           Try {
             spark.sql(s"DROP INDEX $indexName ON $tableName").show(false)
             println(s"  Dropped index: $indexName")
             droppedCount += 1
           } match {
             case Success(_) =>
             case Failure(e) => 
               println(s"  Error dropping index $indexName: ${e.getMessage}")
               e.printStackTrace()
           }
         }
       }
       
       if (droppedCount == 0) {
         println("  No secondary indices to drop")
       } else {
         println(s"  Dropped $droppedCount indices")
       }
     }
     
     def createSecondaryIndices(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
       val columnsToIndex = params("columnsToIndex").asInstanceOf[Seq[String]]
       
       println(s"\n=== Creating secondary indices for $tableName ===")
       println(s"  Creating indices for ${columnsToIndex.size} columns")
       
       var successCount = 0
       var failCount = 0
       
       columnsToIndex.foreach { column =>
         val indexName = s"idx_$column"
         Try {
           println(s"  Creating index: $indexName")
           spark.sql(s"CREATE INDEX $indexName ON $tableName 
($column)").show(false)
           successCount += 1
           println(s"  ✓ Successfully created index: $indexName")
         } match {
           case Success(_) =>
           case Failure(e) =>
             failCount += 1
             println(s"  ✗ Error creating index $indexName: ${e.getMessage}")
             e.printStackTrace()
         }
       }
       
       println(s"\n  Summary for $tableName:")
       println(s"    - Successfully created: $successCount indices")
       println(s"    - Failed: $failCount indices")
     }
     
     def verifyTable(params: Map[String, Any]): Try[Unit] = Try {
       val tableName = params("tableName").asInstanceOf[String]
       val partitionCol = params("partitionCol").asInstanceOf[String]
       
       println(s"\n=== Verifying $tableName ===")
       
       // Show all indices
       println(s"Current indices for $tableName:")
       spark.sql(s"SHOW INDEXES FROM $tableName").show(false)
       
       // Run test query
       val testQuery = s"SELECT COUNT(*) as row_count FROM $tableName WHERE 
$partitionCol IS NOT NULL LIMIT 10"
       println(s"Test query for $tableName:")
       spark.sql(testQuery).show(false)
       println(s"✓ $tableName verification successful")
     }
   }
   
   // ========================================
   // Main Execution
   // ========================================
   
   // Script-level configuration parameters
   val SCALE = sys.env.getOrElse("SCALE", "1gb")
   val OUTPUT_DIR = sys.env.getOrElse("OUTPUT_DIR",
     
s"/tmp/output_${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"))}")
   val TARGET_PATH = sys.env.getOrElse("TARGET_PATH", 
"file:///Users/zhanyeha/tpcds-test/tpcds_ss")
   val SCRIPT_DIR = sys.env.getOrElse("SCRIPT_DIR", ".")
   
   // Derived configurations
   val format = "hudi"
   val dbName = s"tpcds_hudi_$SCALE".toLowerCase
   val sourcePath = "/Users/zhanyeha/tpcds-test/tpcds_1g"
   
   println(s"========================================")
   println(s"TPC-DS Index Creation Configuration")
   println(s"========================================")
   println(s"Dataset Scale: $SCALE")
   println(s"Database Name: $dbName")
   println(s"Target Path: $TARGET_PATH")
   println(s"========================================")
   
   // Initialize TPCDS helper
   val tpcds = new TPCDS(spark, format, sourcePath, TARGET_PATH, dbName = 
dbName)
   
   // Configure Spark settings
   spark.sql("set hoodie.metadata.enable=true")
   spark.sql("set hoodie.metadata.index.async=true")
   spark.sql("set hoodie.metadata.record.index.enable=true")
   spark.sql("set hoodie.write.concurrency.mode=optimistic_concurrency_control")
   spark.sql("set hoodie.clean.policy.failed.writes=LAZY")
   spark.sql("set 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider")
   
   println(s"========= Processing catalog_sales table for $SCALE dataset 
=========")
   
   // Step 1: Bootstrap existing tables
   println("\n=== Step 1: Bootstrapping existing tables ===")
   tpcds.bootstrapExistingTables(skip = 
TPCDS.tpcdsAllTables.diff(Seq("catalog_sales")).toSet)
   // tpcds.bootstrapTables(skip = 
TPCDS.tpcdsAllTables.diff(Seq("catalog_sales")).toSet)
   
   // Step 2: Enable metadata
   println("\n=== Step 2: Enabling metadata ===")
   spark.sql(s"USE $dbName").show(false)
   spark.sql("set hoodie.metadata.enable=true").show(false)
   // spark.sql("set 
hoodie.metadata.index.column.stats.enable=true").show(false)
   
   // Execute the flattened procedure pipeline
   var currentTable = "catalog_sales"
   procedurePipeline.foreach { case (procedure, params) =>
     val tableName = params("tableName").asInstanceOf[String]
     
     // Print table header when switching to a new table
     if (tableName != currentTable) {
       println(s"\n================================================")
       println(s"Processing table: $tableName")
       println(s"================================================")
       currentTable = tableName
     }
     
     // Execute procedure using switch-case pattern
     val result = procedure match {
       case Procedure.ROLLBACK_PENDING_INSTANTS =>
         ProcedureFunctions.rollbackPendingInstants(params)
         
       case Procedure.BOOTSTRAP_METADATA =>
         ProcedureFunctions.bootstrapMetadata(params)
         
       case Procedure.DISABLE_METADATA =>
         ProcedureFunctions.disableMetadata(params)
         
       case Procedure.UPGRADE_TABLE_VERSION =>
         ProcedureFunctions.upgradeTableVersion(params)
         
       case Procedure.CREATE_RECORD_INDEX =>
         ProcedureFunctions.createRecordIndex(params)
         
       case Procedure.DROP_SECONDARY_INDICES =>
         ProcedureFunctions.dropSecondaryIndices(params)
         
       case Procedure.CREATE_SECONDARY_INDICES =>
         ProcedureFunctions.createSecondaryIndices(params)
         
       case Procedure.VERIFY_TABLE =>
         ProcedureFunctions.verifyTable(params)
     }
     
     // Handle procedure execution result
     result match {
       case Success(_) =>
         println(s"✓ Procedure ${procedure} completed successfully")
       case Failure(e) =>
         println(s"✗ Procedure ${procedure} failed: ${e.getMessage}")
         e.printStackTrace()
     }
   }
   
   // Final summary
   println(s"\n========= Script execution completed for catalog_sales 
=========")
   println(s"\nDataset SCALE: $SCALE")
   println(s"Database: $dbName")
   
   println("\nProcessed table: catalog_sales")
   println(s"\nTotal procedures executed: ${procedurePipeline.size}")
   println("\nOperations performed:")
   println("  1. Rolled back any pending instants")
   println("  2. Bootstrapped metadata (MDT enabled)")
   println("  3. Upgraded table to version 9")
   println("  4. Created Record Level Index (RLI)")
   println("  5. Dropped all existing secondary indices")
   println("  6. Created secondary indices on: cs_order_number, cs_item_sk")
   println("  7. Verified table configuration")
   
   println("\nNote: Secondary indices created with naming pattern: 
idx_<column_name>")
   
   // Exit spark shell automatically
   println("\nExiting Spark shell...")
   System.exit(0)
   ```
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : a749d6b9a557ebead5227ac9421513ac57afde57
   
   * Spark version : 3.5.2
   
   * Hive version :NA
   
   * Hadoop version :NA
   
   * Storage (HDFS/S3/GCS..) : HDFS (run locally on mac)
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   
   ```
   ✗ Procedure UPGRADE_TABLE_VERSION failed: Error while completing streaming 
commit to metadata with instant 20250915110854452
   org.apache.hudi.exception.HoodieException: Error while completing streaming 
commit to metadata with instant 20250915110854452
        at 
org.apache.hudi.client.StreamingMetadataWriteHandler.commitToMetadataTable(StreamingMetadataWriteHandler.java:81)
        at 
org.apache.hudi.client.SparkRDDWriteClient.writeToMetadataTable(SparkRDDWriteClient.java:163)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:317)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:273)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:146)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1000)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:545)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:195)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:213)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
        at 
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:114)
        at 
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:72)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ProcedureFunctions$.$anonfun$upgradeTableVersion$1(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.util.Try$.apply(Try.scala:213)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ProcedureFunctions$.upgradeTableVersion(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:289)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$new$1(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:502)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$new$1$adapted(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:479)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:479)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:619)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:621)
        at 
$line14.$read$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:623)
        at 
$line14.$read$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:625)
        at 
$line14.$read$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:627)
        at 
$line14.$read$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:629)
        at 
$line14.$read$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:631)
        at 
$line14.$read.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:633)
        at 
$line14.$read$.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:637)
        at 
$line14.$read$.<clinit>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala)
        at 
$line14.$eval$.$print$lzycompute(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:7)
        at 
$line14.$eval$.$print(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:6)
        at 
$line14.$eval.$print(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
        at 
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
        at 
scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
        at 
scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
        at 
scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
        at 
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
        at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
        at 
scala.tools.nsc.interpreter.ILoop.$anonfun$pasteCommand$11(ILoop.scala:795)
        at scala.tools.nsc.interpreter.IMain.withLabel(IMain.scala:111)
        at scala.tools.nsc.interpreter.ILoop.interpretCode$1(ILoop.scala:795)
        at scala.tools.nsc.interpreter.ILoop.pasteCommand(ILoop.scala:801)
        at 
org.apache.spark.repl.SparkILoop.$anonfun$process$8(SparkILoop.scala:177)
        at 
org.apache.spark.repl.SparkILoop.$anonfun$process$8$adapted(SparkILoop.scala:176)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.repl.SparkILoop.loadInitFiles$1(SparkILoop.scala:176)
        at 
org.apache.spark.repl.SparkILoop.$anonfun$process$4(SparkILoop.scala:166)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.tools.nsc.interpreter.ILoop.$anonfun$mumly$1(ILoop.scala:166)
        at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:206)
        at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:163)
        at org.apache.spark.repl.SparkILoop.loopPostInit$1(SparkILoop.scala:153)
        at 
org.apache.spark.repl.SparkILoop.$anonfun$process$10(SparkILoop.scala:221)
        at 
org.apache.spark.repl.SparkILoop.withSuppressedSettings$1(SparkILoop.scala:189)
        at org.apache.spark.repl.SparkILoop.startup$1(SparkILoop.scala:201)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:236)
        at org.apache.spark.repl.Main$.doMain(Main.scala:78)
        at org.apache.spark.repl.Main$.main(Main.scala:58)
        at org.apache.spark.repl.Main.main(Main.scala)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.lang.IllegalStateException: Column stats partition must be 
enabled to generate partition stats. Please enable: 
hoodie.metadata.index.column.stats.enable
        at 
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:78)
        at 
org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecords(HoodieTableMetadataUtil.java:440)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter$BatchMetadataConversionFunction.convertMetadata(HoodieBackedTableMetadataWriter.java:1448)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepareAndWriteToNonStreamingPartitions(HoodieBackedTableMetadataWriter.java:1318)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.completeStreamingCommit(HoodieBackedTableMetadataWriter.java:1310)
        at 
org.apache.hudi.client.StreamingMetadataWriteHandler.commitToMetadataTable(StreamingMetadataWriteHandler.java:79)
        ... 110 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to