Davis-Zhang-Onehouse opened a new issue, #13893:
URL: https://github.com/apache/hudi/issues/13893
**_Tips before filing an issue_**
**Describe the problem you faced**
When upgrading to table version 9 from 6 with only RLI and files MDT
partitions using a sql insert, I hit the following stack trace (check stack
trace section)
Also I didn't explicitly set hoodie.metadata.index.column.stats.enable to
false yet when bootstrap the MDT in version 6 does not create col stats but
only files partition.
git hash of hudi internal that reproduce the issue
a749d6b9a557ebead5227ac9421513ac57afde57
.hoodie folder after the issue happens
**To Reproduce**
Steps to reproduce the behavior:
1. Use recent hudi master branch with git hash
a749d6b9a557ebead5227ac9421513ac57afde57
2. create v6 table with MDT off
3. run bootstrap mdt, create RLI and upgrade using this code
```
import scala.io.Source
import org.apache.hudi.benchmarks.TPCDS
import org.apache.spark.sql.SparkSession
import scala.util.{Try, Success, Failure}
import java.io.{File, PrintWriter, FileWriter, BufferedWriter}
import java.time.{LocalDateTime, Instant, ZoneOffset}
import java.time.format.DateTimeFormatter
import scala.collection.mutable.{ArrayBuffer, HashSet}
import java.nio.file.{Files, Paths, StandardOpenOption}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.HoodiePendingRollbackInfo
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import scala.collection.JavaConverters._
// ========================================
// Procedure Definitions
// ========================================
object Procedure extends Enumeration {
val ROLLBACK_PENDING_INSTANTS = Value("ROLLBACK_PENDING_INSTANTS")
val BOOTSTRAP_METADATA = Value("BOOTSTRAP_METADATA")
val DISABLE_METADATA = Value("DISABLE_METADATA")
val UPGRADE_TABLE_VERSION = Value("UPGRADE_TABLE_VERSION")
val CREATE_RECORD_INDEX = Value("CREATE_RECORD_INDEX")
val DROP_SECONDARY_INDICES = Value("DROP_SECONDARY_INDICES")
val CREATE_SECONDARY_INDICES = Value("CREATE_SECONDARY_INDICES")
val VERIFY_TABLE = Value("VERIFY_TABLE")
}
val procedurePipeline = Seq(
// First rollback any pending instants
// (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
// "tableName" -> "catalog_sales"
// )),
(Procedure.BOOTSTRAP_METADATA, Map(
"tableName" -> "catalog_sales"
)),
// (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
// "tableName" -> "catalog_sales"
// )),
(Procedure.CREATE_RECORD_INDEX, Map(
"rliGroupCount" -> -1,
"tableName" -> "catalog_sales",
"recordKeys" -> Seq("cs_item_sk", "cs_order_number")
)),
(Procedure.UPGRADE_TABLE_VERSION, Map(
"tableName" -> "catalog_sales"
)),
// (Procedure.CREATE_SECONDARY_INDICES, Map(
// "tableName" -> "catalog_sales",
// "columnsToIndex" -> Seq("cs_order_number", "cs_item_sk",
"cs_sold_date_sk", "cs_bill_customer_sk")
// )),
// (Procedure.VERIFY_TABLE, Map(
// "tableName" -> "catalog_sales",
// "partitionCol" -> "cs_sold_date_sk"
// )),
// // First rollback any pending instants
// (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
// "tableName" -> "store_sales"
// )),
// (Procedure.BOOTSTRAP_METADATA, Map(
// "tableName" -> "store_sales"
// )),
// (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
// "tableName" -> "store_sales"
// )),
// (Procedure.CREATE_RECORD_INDEX, Map(
// "rliGroupCount" -> -1,
// "tableName" -> "store_sales",
// "recordKeys" -> Seq("ss_item_sk", "ss_ticket_number")
// )),
// (Procedure.CREATE_SECONDARY_INDICES, Map(
// "tableName" -> "store_sales",
// "columnsToIndex" -> Seq("ss_store_sk", "ss_item_sk", "ss_customer_sk")
// )),
// (Procedure.VERIFY_TABLE, Map(
// "tableName" -> "store_sales",
// "partitionCol" -> "ss_sold_date_sk"
// )),
// // First rollback any pending instants
// (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
// "tableName" -> "web_sales"
// )),
// (Procedure.BOOTSTRAP_METADATA, Map(
// "tableName" -> "web_sales"
// )),
// (Procedure.ROLLBACK_PENDING_INSTANTS, Map(
// "tableName" -> "web_sales"
// )),
// (Procedure.CREATE_RECORD_INDEX, Map(
// "rliGroupCount" -> -1,
// "tableName" -> "web_sales",
// "recordKeys" -> Seq("ws_item_sk", "ws_order_number")
// )),
// (Procedure.CREATE_SECONDARY_INDICES, Map(
// "tableName" -> "web_sales",
// "columnsToIndex" -> Seq("ws_order_number")
// )),
// (Procedure.VERIFY_TABLE, Map(
// "tableName" -> "web_sales",
// "partitionCol" -> "ws_sold_date_sk"
// )),
)
// ========================================
// Utility Functions
// ========================================
object TableUtils {
def getTablePath(spark: SparkSession, tableName: String): String = {
val tablePathDF = spark.sql(s"DESCRIBE EXTENDED $tableName")
val locationRow = tablePathDF.filter("col_name = 'Location'").collect()
if (locationRow.isEmpty) {
throw new Exception(s"Could not find location for table $tableName")
}
locationRow(0).getString(1)
}
def createWriteClientAndMetaClient(spark: SparkSession, tableName:
String): (SparkRDDWriteClient[_], HoodieTableMetaClient) = {
val tablePath = getTablePath(spark, tableName)
// Create write client using HoodieCLIUtils
val writeClient = HoodieCLIUtils.createHoodieWriteClient(
spark,
tablePath,
Map.empty[String, String],
Option(tableName)
).asInstanceOf[SparkRDDWriteClient[_]]
// Create a fresh metaClient after write client creation
val metaClient = HoodieTableMetaClient.reload(
HoodieTableMetaClient.builder()
.setConf(org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
.setBasePath(tablePath)
.build()
)
(writeClient, metaClient)
}
}
// ========================================
// Procedure Functions
// ========================================
object ProcedureFunctions {
def rollbackPendingInstants(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
println(s"\n=== Rolling back pending instants for $tableName ===")
val tablePath = TableUtils.getTablePath(spark, tableName)
println(s"Table path: $tablePath")
println("Checking for pending/inflight instants...")
// Create a write client to access rollback functionality
var writeClient: SparkRDDWriteClient[_] = null
var metaClient: HoodieTableMetaClient = null
try {
val (client, meta) = TableUtils.createWriteClientAndMetaClient(spark,
tableName)
writeClient = client
metaClient = meta
// Use the public rollbackFailedWrites method which handles everything
internally
println("Attempting to rollback failed writes...")
writeClient.rollBackFailedWrites(metaClient)
println("✓ Rollback check completed")
val rollbackPerformed = writeClient.rollbackFailedWrites(metaClient)
if (rollbackPerformed) {
println("✓ Successfully rolled back failed writes")
} else {
println("No failed writes found to rollback")
}
// Run clean operation after rollback using write client
println("\n=== Running clean operation after rollback ===")
Try {
// Run clean using write client
val cleanResult = writeClient.clean()
} match {
case Success(_) =>
case Failure(e) =>
println(s"Clean operation warning: ${e.getMessage}")
e.printStackTrace()
}
} catch {
case e: Exception =>
println(s"Error during rollback: ${e.getMessage}")
e.printStackTrace()
// Don't rethrow as this is a best-effort operation
} finally {
if (writeClient != null) {
Try(writeClient.close())
}
}
}
def bootstrapMetadata(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
val tablePath = TableUtils.getTablePath(spark, tableName)
val metaClient = HoodieTableMetaClient.reload(
HoodieTableMetaClient.builder()
.setConf(org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
.setBasePath(tablePath)
.build()
)
println(s"\n=== Dummy dml to trigger MDT bootstrap $tableName ===")
println(s"Running dummy INSERT to bootstrap metadata...")
spark.sql(s"set hoodie.metadata.enable=true").show(false)
spark.sql(s"set
hoodie.write.table.version=${metaClient.getTableConfig().getTableVersion().versionCode()}")
.show(false)
// Get table schema to create empty row with proper types
val tableDF = spark.table(tableName)
val schema = tableDF.schema
// Create an empty DataFrame with the same schema
val emptyDF =
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
emptyDF.createOrReplaceTempView("empty_data")
// Insert empty data (0 rows) to trigger metadata bootstrap
spark.sql(s"INSERT INTO $tableName SELECT * FROM empty_data").show(false)
println(s"Successfully bootstrapped metadata for $tableName")
}
def disableMetadata(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
println(s"\n=== Temporarily disabling metadata for $tableName ===")
// Disable metadata
println(s"Disabling metadata...")
spark.sql(s"set hoodie.metadata.enable=false").show(false)
// Get table schema to create empty row with proper types
val tableDF = spark.table(tableName)
val schema = tableDF.schema
// Create an empty DataFrame with the same schema
val emptyDF =
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
emptyDF.createOrReplaceTempView("empty_disable_data")
// Insert empty data (0 rows) with metadata disabled
println(s"Running dummy INSERT with metadata disabled...")
spark.sql(s"INSERT INTO $tableName SELECT * FROM
empty_disable_data").show(false)
// Re-enable metadata
println(s"Re-enabling metadata...")
spark.sql(s"set hoodie.metadata.enable=true").show(false)
println(s"Successfully completed metadata disable/enable cycle for
$tableName")
}
def upgradeTableVersion(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
println(s"\n=== Upgrading $tableName to version 9 ===")
spark.sql("set hoodie.write.table.version=9").show(false)
spark.sql("set hoodie.write.auto.upgrade=true").show(false)
println(s"Running dummy INSERT to trigger upgrade...")
// Get table schema to create empty row with proper types
val tableDF = spark.table(tableName)
val schema = tableDF.schema
// Create an empty DataFrame with the same schema
val emptyDF =
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
emptyDF.createOrReplaceTempView("empty_upgrade_data")
// Insert empty data (0 rows) to trigger table upgrade
spark.sql(s"INSERT INTO $tableName SELECT * FROM
empty_upgrade_data").show(false)
println(s"Successfully triggered upgrade for $tableName")
}
def createRecordIndex(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
val recordKeys = params("recordKeys").asInstanceOf[Seq[String]]
val rliGroupCount = params("rliGroupCount").asInstanceOf[Int]
println(s"\n=== Creating record index (RLI) for $tableName ===")
val indicesDF = spark.sql(s"SHOW INDEXES FROM $tableName")
val indices = indicesDF.collect()
val hasRecordIndex = indices.exists(row => row.getString(0) ==
"record_index")
if (!hasRecordIndex) {
val recordKeysStr = recordKeys.mkString(", ")
println(s"Creating record index on columns: $recordKeysStr")
// Set RLI configuration parameters
println("Setting RLI configuration parameters...")
spark.sql("set
hoodie.metadata.record.index.min.filegroup.count=10").show(false)
spark.sql("set
hoodie.metadata.record.index.max.filegroup.count=1000000").show(false)
if (rliGroupCount > 0) {
spark.sql(s"set
hoodie.metadata.record.index.min.filegroup.count=$rliGroupCount").show(false)
spark.sql(s"set
hoodie.metadata.record.index.max.filegroup.count=$rliGroupCount").show(false)
}
spark.sql(s"CREATE INDEX record_index ON $tableName
($recordKeysStr)").show(false)
println(s"Successfully created record index for $tableName")
// Reset RLI configuration parameters
spark.sql("set
hoodie.metadata.record.index.min.filegroup.count=10").show(false)
spark.sql("set
hoodie.metadata.record.index.max.filegroup.count=1000000").show(false)
} else {
println(s"Record index already exists for $tableName")
}
}
def dropSecondaryIndices(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
println(s"\n=== Dropping existing secondary indices for $tableName ===")
val indicesDF = spark.sql(s"SHOW INDEXES FROM $tableName")
val indices = indicesDF.collect()
var droppedCount = 0
indices.foreach { row =>
val indexName = row.getString(0)
// Drop indices that start with idx_ or secondary_index_
if (indexName.startsWith("idx_") ||
indexName.startsWith("secondary_index_")) {
Try {
spark.sql(s"DROP INDEX $indexName ON $tableName").show(false)
println(s" Dropped index: $indexName")
droppedCount += 1
} match {
case Success(_) =>
case Failure(e) =>
println(s" Error dropping index $indexName: ${e.getMessage}")
e.printStackTrace()
}
}
}
if (droppedCount == 0) {
println(" No secondary indices to drop")
} else {
println(s" Dropped $droppedCount indices")
}
}
def createSecondaryIndices(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
val columnsToIndex = params("columnsToIndex").asInstanceOf[Seq[String]]
println(s"\n=== Creating secondary indices for $tableName ===")
println(s" Creating indices for ${columnsToIndex.size} columns")
var successCount = 0
var failCount = 0
columnsToIndex.foreach { column =>
val indexName = s"idx_$column"
Try {
println(s" Creating index: $indexName")
spark.sql(s"CREATE INDEX $indexName ON $tableName
($column)").show(false)
successCount += 1
println(s" ✓ Successfully created index: $indexName")
} match {
case Success(_) =>
case Failure(e) =>
failCount += 1
println(s" ✗ Error creating index $indexName: ${e.getMessage}")
e.printStackTrace()
}
}
println(s"\n Summary for $tableName:")
println(s" - Successfully created: $successCount indices")
println(s" - Failed: $failCount indices")
}
def verifyTable(params: Map[String, Any]): Try[Unit] = Try {
val tableName = params("tableName").asInstanceOf[String]
val partitionCol = params("partitionCol").asInstanceOf[String]
println(s"\n=== Verifying $tableName ===")
// Show all indices
println(s"Current indices for $tableName:")
spark.sql(s"SHOW INDEXES FROM $tableName").show(false)
// Run test query
val testQuery = s"SELECT COUNT(*) as row_count FROM $tableName WHERE
$partitionCol IS NOT NULL LIMIT 10"
println(s"Test query for $tableName:")
spark.sql(testQuery).show(false)
println(s"✓ $tableName verification successful")
}
}
// ========================================
// Main Execution
// ========================================
// Script-level configuration parameters
val SCALE = sys.env.getOrElse("SCALE", "1gb")
val OUTPUT_DIR = sys.env.getOrElse("OUTPUT_DIR",
s"/tmp/output_${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"))}")
val TARGET_PATH = sys.env.getOrElse("TARGET_PATH",
"file:///Users/zhanyeha/tpcds-test/tpcds_ss")
val SCRIPT_DIR = sys.env.getOrElse("SCRIPT_DIR", ".")
// Derived configurations
val format = "hudi"
val dbName = s"tpcds_hudi_$SCALE".toLowerCase
val sourcePath = "/Users/zhanyeha/tpcds-test/tpcds_1g"
println(s"========================================")
println(s"TPC-DS Index Creation Configuration")
println(s"========================================")
println(s"Dataset Scale: $SCALE")
println(s"Database Name: $dbName")
println(s"Target Path: $TARGET_PATH")
println(s"========================================")
// Initialize TPCDS helper
val tpcds = new TPCDS(spark, format, sourcePath, TARGET_PATH, dbName =
dbName)
// Configure Spark settings
spark.sql("set hoodie.metadata.enable=true")
spark.sql("set hoodie.metadata.index.async=true")
spark.sql("set hoodie.metadata.record.index.enable=true")
spark.sql("set hoodie.write.concurrency.mode=optimistic_concurrency_control")
spark.sql("set hoodie.clean.policy.failed.writes=LAZY")
spark.sql("set
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider")
println(s"========= Processing catalog_sales table for $SCALE dataset
=========")
// Step 1: Bootstrap existing tables
println("\n=== Step 1: Bootstrapping existing tables ===")
tpcds.bootstrapExistingTables(skip =
TPCDS.tpcdsAllTables.diff(Seq("catalog_sales")).toSet)
// tpcds.bootstrapTables(skip =
TPCDS.tpcdsAllTables.diff(Seq("catalog_sales")).toSet)
// Step 2: Enable metadata
println("\n=== Step 2: Enabling metadata ===")
spark.sql(s"USE $dbName").show(false)
spark.sql("set hoodie.metadata.enable=true").show(false)
// spark.sql("set
hoodie.metadata.index.column.stats.enable=true").show(false)
// Execute the flattened procedure pipeline
var currentTable = "catalog_sales"
procedurePipeline.foreach { case (procedure, params) =>
val tableName = params("tableName").asInstanceOf[String]
// Print table header when switching to a new table
if (tableName != currentTable) {
println(s"\n================================================")
println(s"Processing table: $tableName")
println(s"================================================")
currentTable = tableName
}
// Execute procedure using switch-case pattern
val result = procedure match {
case Procedure.ROLLBACK_PENDING_INSTANTS =>
ProcedureFunctions.rollbackPendingInstants(params)
case Procedure.BOOTSTRAP_METADATA =>
ProcedureFunctions.bootstrapMetadata(params)
case Procedure.DISABLE_METADATA =>
ProcedureFunctions.disableMetadata(params)
case Procedure.UPGRADE_TABLE_VERSION =>
ProcedureFunctions.upgradeTableVersion(params)
case Procedure.CREATE_RECORD_INDEX =>
ProcedureFunctions.createRecordIndex(params)
case Procedure.DROP_SECONDARY_INDICES =>
ProcedureFunctions.dropSecondaryIndices(params)
case Procedure.CREATE_SECONDARY_INDICES =>
ProcedureFunctions.createSecondaryIndices(params)
case Procedure.VERIFY_TABLE =>
ProcedureFunctions.verifyTable(params)
}
// Handle procedure execution result
result match {
case Success(_) =>
println(s"✓ Procedure ${procedure} completed successfully")
case Failure(e) =>
println(s"✗ Procedure ${procedure} failed: ${e.getMessage}")
e.printStackTrace()
}
}
// Final summary
println(s"\n========= Script execution completed for catalog_sales
=========")
println(s"\nDataset SCALE: $SCALE")
println(s"Database: $dbName")
println("\nProcessed table: catalog_sales")
println(s"\nTotal procedures executed: ${procedurePipeline.size}")
println("\nOperations performed:")
println(" 1. Rolled back any pending instants")
println(" 2. Bootstrapped metadata (MDT enabled)")
println(" 3. Upgraded table to version 9")
println(" 4. Created Record Level Index (RLI)")
println(" 5. Dropped all existing secondary indices")
println(" 6. Created secondary indices on: cs_order_number, cs_item_sk")
println(" 7. Verified table configuration")
println("\nNote: Secondary indices created with naming pattern:
idx_<column_name>")
// Exit spark shell automatically
println("\nExiting Spark shell...")
System.exit(0)
```
4.
**Expected behavior**
A clear and concise description of what you expected to happen.
**Environment Description**
* Hudi version : a749d6b9a557ebead5227ac9421513ac57afde57
* Spark version : 3.5.2
* Hive version :NA
* Hadoop version :NA
* Storage (HDFS/S3/GCS..) : HDFS (run locally on mac)
* Running on Docker? (yes/no) : no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```
✗ Procedure UPGRADE_TABLE_VERSION failed: Error while completing streaming
commit to metadata with instant 20250915110854452
org.apache.hudi.exception.HoodieException: Error while completing streaming
commit to metadata with instant 20250915110854452
at
org.apache.hudi.client.StreamingMetadataWriteHandler.commitToMetadataTable(StreamingMetadataWriteHandler.java:81)
at
org.apache.hudi.client.SparkRDDWriteClient.writeToMetadataTable(SparkRDDWriteClient.java:163)
at
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:317)
at
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:273)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:146)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1000)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:545)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:195)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:213)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
at
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:114)
at
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:72)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ProcedureFunctions$.$anonfun$upgradeTableVersion$1(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:307)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ProcedureFunctions$.upgradeTableVersion(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:289)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$new$1(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:502)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$new$1$adapted(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:479)
at scala.collection.immutable.List.foreach(List.scala:431)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:479)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:619)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:621)
at
$line14.$read$$iw$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:623)
at
$line14.$read$$iw$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:625)
at
$line14.$read$$iw$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:627)
at
$line14.$read$$iw$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:629)
at
$line14.$read$$iw.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:631)
at
$line14.$read.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:633)
at
$line14.$read$.<init>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:637)
at
$line14.$read$.<clinit>(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala)
at
$line14.$eval$.$print$lzycompute(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:7)
at
$line14.$eval$.$print(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala:6)
at
$line14.$eval.$print(/Users/zhanyeha/operations/tpcdsPerf/benchHistory/sept15/emrDropAllSIAndIndexAllTables/index_refactored.scala)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
at
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
at
scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
at
scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
at
scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
at
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
at
scala.tools.nsc.interpreter.ILoop.$anonfun$pasteCommand$11(ILoop.scala:795)
at scala.tools.nsc.interpreter.IMain.withLabel(IMain.scala:111)
at scala.tools.nsc.interpreter.ILoop.interpretCode$1(ILoop.scala:795)
at scala.tools.nsc.interpreter.ILoop.pasteCommand(ILoop.scala:801)
at
org.apache.spark.repl.SparkILoop.$anonfun$process$8(SparkILoop.scala:177)
at
org.apache.spark.repl.SparkILoop.$anonfun$process$8$adapted(SparkILoop.scala:176)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.repl.SparkILoop.loadInitFiles$1(SparkILoop.scala:176)
at
org.apache.spark.repl.SparkILoop.$anonfun$process$4(SparkILoop.scala:166)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.tools.nsc.interpreter.ILoop.$anonfun$mumly$1(ILoop.scala:166)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:206)
at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:163)
at org.apache.spark.repl.SparkILoop.loopPostInit$1(SparkILoop.scala:153)
at
org.apache.spark.repl.SparkILoop.$anonfun$process$10(SparkILoop.scala:221)
at
org.apache.spark.repl.SparkILoop.withSuppressedSettings$1(SparkILoop.scala:189)
at org.apache.spark.repl.SparkILoop.startup$1(SparkILoop.scala:201)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:236)
at org.apache.spark.repl.Main$.doMain(Main.scala:78)
at org.apache.spark.repl.Main$.main(Main.scala:58)
at org.apache.spark.repl.Main.main(Main.scala)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalStateException: Column stats partition must be
enabled to generate partition stats. Please enable:
hoodie.metadata.index.column.stats.enable
at
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:78)
at
org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecords(HoodieTableMetadataUtil.java:440)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter$BatchMetadataConversionFunction.convertMetadata(HoodieBackedTableMetadataWriter.java:1448)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepareAndWriteToNonStreamingPartitions(HoodieBackedTableMetadataWriter.java:1318)
at
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.completeStreamingCommit(HoodieBackedTableMetadataWriter.java:1310)
at
org.apache.hudi.client.StreamingMetadataWriteHandler.commitToMetadataTable(StreamingMetadataWriteHandler.java:79)
... 110 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]