[
https://issues.apache.org/jira/browse/HUDI-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shawn Chang updated HUDI-8899:
------------------------------
Description:
Hudi 1.0's backward writer cannot turn MDT off on a Hudi 0.14 table that has
MDT enabled using `.option("hoodie.metadata.enable", "false")`
Reproduction steps:
# Create the table with 0.14.0
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.sql.SaveMode
val df1 = Seq(
(100, "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
(101, "2015-01-01", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
(102, "2015-01-01", "event_name_345", "2015-01-01T13:51:40.417052Z", "type3"),
(103, "2015-01-01", "event_name_234", "2015-01-01T13:51:40.519832Z", "type4"),
(104, "2015-01-01", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
(105, "2015-01-01", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2"),
(106, "2015-01-01", "event_name_890", "2015-01-01T13:51:44.735360Z", "type3"),
(107, "2015-01-01", "event_name_944", "2015-01-01T13:51:45.019544Z", "type4"),
(108, "2015-01-01", "event_name_456", "2015-01-01T13:51:45.208007Z", "type1"),
(109, "2015-01-01", "event_name_567", "2015-01-01T13:51:45.369689Z", "type2"),
(110, "2015-01-01", "event_name_789", "2015-01-01T12:15:05.664947Z", "type3"),
(111, "2015-01-01", "event_name_322", "2015-01-01T13:51:47.388239Z", "type4")
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
val r = scala.util.Random
val num = r.nextInt(99999)
var tableName = "yxchang_hudi_cow_simple_14_" + num
var tablePath = "s3://<bucket>" + tableName + "/"
df1.write.format("hudi")
.option("hoodie.metadata.enable", "true")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.operation", "insert") // use insert
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
.option("hoodie.datasource.write.partitionpath.field", "event_type")
.option("hoodie.datasource.write.precombine.field", "event_ts")
.option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.meta.sync.enable", "true")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.database", "yxchang_nolf")
.option("hoodie.datasource.hive_sync.table", tableName)
.option("hoodie.datasource.hive_sync.partition_fields", "event_type")
.option("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Append)
.save(tablePath) {code}
2. Use Hudi 1.0 backward writer + Spark 3.5 to append data to this table and
set .option("hoodie.metadata.enable", "false")
{code:java}
val appendDf = Seq(
(142, "2015-01-02", "event_name_922", "2015-01-01T13:51:39.340396Z", "type1"),
(143, "2015-01-03", "event_name_533", "2015-01-01T12:14:58.597216Z", "type2"),
(124, "2015-01-04", "event_name_344", "2015-01-01T13:51:40.417052Z", "type3"),
(125, "2015-01-05", "event_name_266", "2015-01-01T13:51:40.519832Z", "type4"),
(126, "2015-01-06", "event_name_177", "2015-01-01T12:15:00.512679Z", "type1"),
(127, "2015-01-07", "event_name_688", "2015-01-01T13:51:42.248818Z", "type2"),
(128, "2015-01-08", "event_name_891", "2015-01-01T13:51:44.735360Z", "type3"),
(129, "2015-01-09", "event_name_945", "2015-01-01T13:51:45.019544Z", "type4"),
(120, "2015-01-10", "event_name_450", "2015-01-01T13:51:45.208007Z", "type1"),
(131, "2015-01-11", "event_name_562", "2015-01-01T13:51:45.369689Z", "type2"),
(132, "2015-01-12", "event_name_786", "2015-01-01T12:15:05.664947Z", "type3"),
(133, "2015-01-13", "event_name_328", "2015-01-01T13:51:47.388239Z", "type4")
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
appendDf.write.format("hudi")
.option("hoodie.metadata.enable", "false")
.option("hoodie.table.name", tableName)
.option("hoodie.table.version", 6)
.option("hoodie.write.table.version", 6)
.option("hoodie.table.initial.version", 6)
.option("hoodie.datasource.write.operation", "insert") // use insert
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
.option("hoodie.datasource.write.partitionpath.field", "event_type")
.option("hoodie.datasource.write.precombine.field", "event_ts")
.option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.meta.sync.enable", "true")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.database", "yxchang_nolf")
.option("hoodie.datasource.hive_sync.table", tableName)
.option("hoodie.datasource.hive_sync.partition_fields", "event_type")
.option("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Append)
.save(tablePath) {code}
Exception:
{code:java}
org.apache.hudi.exception.HoodieException: Error limiting instant archival
based on metadata table
at
org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1.getInstantsToArchive(TimelineArchiverV1.java:309)
at
org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1.archiveIfRequired(TimelineArchiverV1.java:142)
at
org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:834)
at
org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:887)
at
org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:616)
at
org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:582)
at
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:258)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:93)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:992)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:534)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:190)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:157)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$10(SQLExecution.scala:220)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:220)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:405)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:83)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at
org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:212)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:170)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:157)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$10(SQLExecution.scala:220)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:220)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:405)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:83)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:520)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:520)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:303)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:299)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:496)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:884)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:405)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:365)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
... 55 elided
Caused by: java.lang.UnsupportedOperationException
at
org.apache.hudi.metadata.FileSystemBackedTableMetadata.getLatestCompactionTime(FileSystemBackedTableMetadata.java:275)
at
org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1.getInstantsToArchive(TimelineArchiverV1.java:299)
... 117 more {code}
was:
Hudi 1.0's backward writer cannot turn MDT off on a Hudi 0.14 table that has
MDT enabled using `.option("hoodie.metadata.enable", "false")`
Reproduction steps:
# Create the table with 0.14.0
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.sql.SaveMode
val df1 = Seq(
(100, "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
(101, "2015-01-01", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
(102, "2015-01-01", "event_name_345", "2015-01-01T13:51:40.417052Z", "type3"),
(103, "2015-01-01", "event_name_234", "2015-01-01T13:51:40.519832Z", "type4"),
(104, "2015-01-01", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
(105, "2015-01-01", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2"),
(106, "2015-01-01", "event_name_890", "2015-01-01T13:51:44.735360Z", "type3"),
(107, "2015-01-01", "event_name_944", "2015-01-01T13:51:45.019544Z", "type4"),
(108, "2015-01-01", "event_name_456", "2015-01-01T13:51:45.208007Z", "type1"),
(109, "2015-01-01", "event_name_567", "2015-01-01T13:51:45.369689Z", "type2"),
(110, "2015-01-01", "event_name_789", "2015-01-01T12:15:05.664947Z", "type3"),
(111, "2015-01-01", "event_name_322", "2015-01-01T13:51:47.388239Z", "type4")
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
val r = scala.util.Random
val num = r.nextInt(99999)
var tableName = "yxchang_hudi_cow_simple_14_" + num
var tablePath = "s3://<bucket>" + tableName + "/"
df1.write.format("hudi")
.option("hoodie.metadata.enable", "true")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.operation", "insert") // use insert
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
.option("hoodie.datasource.write.partitionpath.field", "event_type")
.option("hoodie.datasource.write.precombine.field", "event_ts")
.option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.meta.sync.enable", "true")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.database", "yxchang_nolf")
.option("hoodie.datasource.hive_sync.table", tableName)
.option("hoodie.datasource.hive_sync.partition_fields", "event_type")
.option("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Append)
.save(tablePath) {code}
2. Use Hudi 1.0 backward writer + Spark 3.5 to append data to this table and
set .option("hoodie.metadata.enable", "false")
{code:java}
val appendDf = Seq(
(142, "2015-01-02", "event_name_922", "2015-01-01T13:51:39.340396Z", "type1"),
(143, "2015-01-03", "event_name_533", "2015-01-01T12:14:58.597216Z", "type2"),
(124, "2015-01-04", "event_name_344", "2015-01-01T13:51:40.417052Z", "type3"),
(125, "2015-01-05", "event_name_266", "2015-01-01T13:51:40.519832Z", "type4"),
(126, "2015-01-06", "event_name_177", "2015-01-01T12:15:00.512679Z", "type1"),
(127, "2015-01-07", "event_name_688", "2015-01-01T13:51:42.248818Z", "type2"),
(128, "2015-01-08", "event_name_891", "2015-01-01T13:51:44.735360Z", "type3"),
(129, "2015-01-09", "event_name_945", "2015-01-01T13:51:45.019544Z", "type4"),
(120, "2015-01-10", "event_name_450", "2015-01-01T13:51:45.208007Z", "type1"),
(131, "2015-01-11", "event_name_562", "2015-01-01T13:51:45.369689Z", "type2"),
(132, "2015-01-12", "event_name_786", "2015-01-01T12:15:05.664947Z", "type3"),
(133, "2015-01-13", "event_name_328", "2015-01-01T13:51:47.388239Z", "type4")
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
appendDf.write.format("hudi")
.option("hoodie.metadata.enable", "false")
.option("hoodie.table.name", tableName)
.option("hoodie.table.version", 6)
.option("hoodie.write.table.version", 6)
.option("hoodie.table.initial.version", 6)
.option("hoodie.datasource.write.operation", "insert") // use insert
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
// .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
// .option("hoodie.compact.inline", inlineCompaction)
// .option("hoodie.compact.inline.max.delta.commits", 1)
.option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
.option("hoodie.datasource.write.partitionpath.field", "event_type")
.option("hoodie.datasource.write.precombine.field", "event_ts")
.option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.meta.sync.enable", "true")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.database", "yxchang_nolf")
.option("hoodie.datasource.hive_sync.table", tableName)
.option("hoodie.datasource.hive_sync.partition_fields", "event_type")
.option("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Append)
.save(tablePath) {code}
> Hudi 1.0 backward writer is not able to turn off MDT
> ----------------------------------------------------
>
> Key: HUDI-8899
> URL: https://issues.apache.org/jira/browse/HUDI-8899
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Shawn Chang
> Priority: Major
>
> Hudi 1.0's backward writer cannot turn MDT off on a Hudi 0.14 table that has
> MDT enabled using `.option("hoodie.metadata.enable", "false")`
>
> Reproduction steps:
> # Create the table with 0.14.0
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.spark.sql.SaveMode
> val df1 = Seq(
> (100, "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z",
> "type1"),
> (101, "2015-01-01", "event_name_546", "2015-01-01T12:14:58.597216Z",
> "type2"),
> (102, "2015-01-01", "event_name_345", "2015-01-01T13:51:40.417052Z",
> "type3"),
> (103, "2015-01-01", "event_name_234", "2015-01-01T13:51:40.519832Z",
> "type4"),
> (104, "2015-01-01", "event_name_123", "2015-01-01T12:15:00.512679Z",
> "type1"),
> (105, "2015-01-01", "event_name_678", "2015-01-01T13:51:42.248818Z",
> "type2"),
> (106, "2015-01-01", "event_name_890", "2015-01-01T13:51:44.735360Z",
> "type3"),
> (107, "2015-01-01", "event_name_944", "2015-01-01T13:51:45.019544Z",
> "type4"),
> (108, "2015-01-01", "event_name_456", "2015-01-01T13:51:45.208007Z",
> "type1"),
> (109, "2015-01-01", "event_name_567", "2015-01-01T13:51:45.369689Z",
> "type2"),
> (110, "2015-01-01", "event_name_789", "2015-01-01T12:15:05.664947Z",
> "type3"),
> (111, "2015-01-01", "event_name_322", "2015-01-01T13:51:47.388239Z", "type4")
> ).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
> val r = scala.util.Random
> val num = r.nextInt(99999)
> var tableName = "yxchang_hudi_cow_simple_14_" + num
> var tablePath = "s3://<bucket>" + tableName + "/"
> df1.write.format("hudi")
> .option("hoodie.metadata.enable", "true")
> .option("hoodie.table.name", tableName)
> .option("hoodie.datasource.write.operation", "insert") // use insert
> .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
> .option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
> .option("hoodie.datasource.write.partitionpath.field", "event_type")
> .option("hoodie.datasource.write.precombine.field", "event_ts")
> .option("hoodie.datasource.write.keygenerator.class",
> "org.apache.hudi.keygen.ComplexKeyGenerator")
> .option("hoodie.datasource.hive_sync.enable", "true")
> .option("hoodie.datasource.meta.sync.enable", "true")
> .option("hoodie.datasource.hive_sync.mode", "hms")
> .option("hoodie.datasource.hive_sync.database", "yxchang_nolf")
> .option("hoodie.datasource.hive_sync.table", tableName)
> .option("hoodie.datasource.hive_sync.partition_fields", "event_type")
> .option("hoodie.datasource.hive_sync.partition_extractor_class",
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Append)
> .save(tablePath) {code}
> 2. Use Hudi 1.0 backward writer + Spark 3.5 to append data to this table and
> set .option("hoodie.metadata.enable", "false")
> {code:java}
> val appendDf = Seq(
> (142, "2015-01-02", "event_name_922", "2015-01-01T13:51:39.340396Z",
> "type1"),
> (143, "2015-01-03", "event_name_533", "2015-01-01T12:14:58.597216Z",
> "type2"),
> (124, "2015-01-04", "event_name_344", "2015-01-01T13:51:40.417052Z",
> "type3"),
> (125, "2015-01-05", "event_name_266", "2015-01-01T13:51:40.519832Z",
> "type4"),
> (126, "2015-01-06", "event_name_177", "2015-01-01T12:15:00.512679Z",
> "type1"),
> (127, "2015-01-07", "event_name_688", "2015-01-01T13:51:42.248818Z",
> "type2"),
> (128, "2015-01-08", "event_name_891", "2015-01-01T13:51:44.735360Z",
> "type3"),
> (129, "2015-01-09", "event_name_945", "2015-01-01T13:51:45.019544Z",
> "type4"),
> (120, "2015-01-10", "event_name_450", "2015-01-01T13:51:45.208007Z",
> "type1"),
> (131, "2015-01-11", "event_name_562", "2015-01-01T13:51:45.369689Z",
> "type2"),
> (132, "2015-01-12", "event_name_786", "2015-01-01T12:15:05.664947Z",
> "type3"),
> (133, "2015-01-13", "event_name_328", "2015-01-01T13:51:47.388239Z", "type4")
> ).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
>
> appendDf.write.format("hudi")
> .option("hoodie.metadata.enable", "false")
> .option("hoodie.table.name", tableName)
> .option("hoodie.table.version", 6)
> .option("hoodie.write.table.version", 6)
> .option("hoodie.table.initial.version", 6)
> .option("hoodie.datasource.write.operation", "insert") // use insert
> .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
> .option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
> .option("hoodie.datasource.write.partitionpath.field", "event_type")
> .option("hoodie.datasource.write.precombine.field", "event_ts")
> .option("hoodie.datasource.write.keygenerator.class",
> "org.apache.hudi.keygen.ComplexKeyGenerator")
> .option("hoodie.datasource.hive_sync.enable", "true")
> .option("hoodie.datasource.meta.sync.enable", "true")
> .option("hoodie.datasource.hive_sync.mode", "hms")
> .option("hoodie.datasource.hive_sync.database", "yxchang_nolf")
> .option("hoodie.datasource.hive_sync.table", tableName)
> .option("hoodie.datasource.hive_sync.partition_fields", "event_type")
> .option("hoodie.datasource.hive_sync.partition_extractor_class",
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Append)
> .save(tablePath) {code}
>
> Exception:
> {code:java}
> org.apache.hudi.exception.HoodieException: Error limiting instant archival
> based on metadata table
> at
> org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1.getInstantsToArchive(TimelineArchiverV1.java:309)
> at
> org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1.archiveIfRequired(TimelineArchiverV1.java:142)
> at
> org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:834)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:887)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:616)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:582)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:258)
> at
> org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:93)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:992)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:534)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:190)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:157)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$10(SQLExecution.scala:220)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:220)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:405)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:219)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:83)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> at
> org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:212)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:170)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:157)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$10(SQLExecution.scala:220)
> at
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
> at
> org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:384)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:220)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:405)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:219)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:83)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:520)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:520)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:303)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:299)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:496)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:884)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:405)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:365)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
> ... 55 elided
> Caused by: java.lang.UnsupportedOperationException
> at
> org.apache.hudi.metadata.FileSystemBackedTableMetadata.getLatestCompactionTime(FileSystemBackedTableMetadata.java:275)
> at
> org.apache.hudi.client.timeline.versioning.v1.TimelineArchiverV1.getInstantsToArchive(TimelineArchiverV1.java:299)
> ... 117 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)