navbalaraman commented on issue #7060:
URL: https://github.com/apache/hudi/issues/7060#issuecomment-1310495311
@nsivabalan Now i am seeing an issue with the upgrade code running on EMR
6.8.0. The initial pull of the data happnd with hudi 0.9.0 and spark 3.1.2 on
EMR 6.5. Now the incremental pull with hudi 0.12.0 and spark 3.3.0 on EMR 6.8.0
is failing with the below error.
Any thoughts on what is needed to make this incremental pull to work without
having to wipe out existing data and do initial pull again with the new version?
It throws an error saying "_col_partition" which is the partition field (set
as below) is missing. I verified in the Glue catalog and it does exist. In the
existing parquet the col name is like
"_hoodie_partition_path":"_col_partition=2022-10-12"
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "_col_partition",
Also, on the initial and incremental pull we have the below config to avoid
duplicate partition column showing up in glue catalog.
option(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), true)
other configs:
HoodieWriteConfig.TBL_NAME.key() -> tableName,
DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_id.oid",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "_col_partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "_update_ts",
DataSourceWriteOptions.HIVE_SYNC_ENABLED.key() -> "true",
DataSourceWriteOptions.HIVE_URL.key() ->
s"jdbc:hive2://${masterDns}:10000",
DataSourceWriteOptions.HIVE_DATABASE.key() -> dbName,
DataSourceWriteOptions.HIVE_TABLE.key() -> tableName,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key() ->
classOf[MultiPartKeysValueExtractor].getName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key() -> "_col_partition",
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> "1",
HoodieCleanConfig.CLEANER_POLICY.key() ->
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(),
HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key() ->dbname,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key() -> tableName,
ERROR STACK TRACE:
diagnostics: User class threw exception:
org.sparkproject.guava.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Cannot find columns: '_col_partition' in
the
schema[StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(_col_origin_ts,LongType,true),StructField(_col_update_ts,LongType,true),StructField(devicetype,StringType,true),StructField(deviceid,StringType,true),StructField(createdOn,TimestampType,true),StructField(channel,StringType,true),StructField(source,StringType,true),StructField(_class,StringType,true),StructField(model,StringType,true),StructField(commandoperation,StringType,true),StructField(phases,ArrayType(StructType(StructField(dateTime,TimestampType,true),StructField(dcmErrorCode,StringType,true),StructField(message,StringType,true
),StructField(payload,StringType,true),StructField(phaseDescription,StringType,true),StructField(phaseId,IntegerType,true),StructField(statusCode,IntegerType,true),StructField(statusDescription,StringType,true)),true),true),StructField(guid,StringType,true),StructField(commanddatetime,TimestampType,true),StructField(_id,StructType(StructField(oid,StringType,true)),true),StructField(isacsettingson,BooleanType,true),StructField(command,StringType,true),StructField(returncode,StringType,true),StructField(osversion,StringType,true),StructField(totaltimetaken,LongType,true),StructField(dcmtype,StringType,true),StructField(status,StringType,true),StructField(vin,StringType,true),StructField(correlationid,StringType,true),StructField(apprequestnumber,StringType,true),StructField(profilename,StringType,true),StructField(make,StringType,true),StructField(commandinitiatedby,StringType,true),StructField(xlocale,StringType,true),StructField(commandrequest,StringType,true),StructField(modifiedOn
,TimestampType,true),StructField(appbrand,StringType,true)]
at
org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
at
org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:177)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:272)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:320)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:306)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
at
org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:226)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:306)
at
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:246)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:215)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:212)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:284)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:327)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:284)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:274)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:274)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:188)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
at
org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:78)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:78)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:76)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:68)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:93)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:604)
at
org.apache.spark.sql.internal.CatalogImpl.refreshTable(CatalogImpl.scala:540)
at
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$3(HoodieSparkSqlWriter.scala:658)
at
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$3$adapted(HoodieSparkSqlWriter.scala:655)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:655)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:734)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]