chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1639955458
To eliminate the column case sensitiveness issue I renamed all columns to
static strings with index-es --Except for ` _id.oid` , `cdc_pk` , and
`addressLog` . Also, I printed the schema of `addressLog` and made sure none of
the columns repeat. Yet, again the same excption.
NOTE: note that I am running on AWS EMR with only ganglia, Spark selected
(no hive/ glue/ hudi are selected)
addressLog column schema
```
|-- addressLogs: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- addressLines: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- createdDate: string (nullable = true)
| | |-- fieldId: string (nullable = true)
| | |-- isDerived: boolean (nullable = true)
| | |-- latLong: string (nullable = true)
| | |-- location: string (nullable = true)
| | |-- locationIp: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- continentCode: string (nullable = true)
| | | |-- continentName: string (nullable = true)
| | | |-- country: string (nullable = true)
| | | |-- countryIsoCode: string (nullable = true)
| | | |-- latitude: string (nullable = true)
| | | |-- longitude: string (nullable = true)
| | | |-- postalCode: string (nullable = true)
| | | |-- registeredCountry: string (nullable = true)
| | | |-- registeredCountryIsoCode: string (nullable = true)
| | | |-- subDivisions: string (nullable = true)
| | | |-- subDivisionsIsoCode: string (nullable = true)
| | | |-- timeZone: string (nullable = true)
| | |-- original: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- country: string (nullable = true)
| | | |-- location: string (nullable = true)
| | | |-- state: string (nullable = true)
| | |-- residentialType: string (nullable = true)
| | |-- source: string (nullable = true)
| | |-- sourceType: string (nullable = true)
| | |-- standardized: boolean (nullable = true)
| | |-- standardizedDate: string (nullable = true)
| | |-- state: string (nullable = true)
| | |-- updatedDate: string (nullable = true)
| | |-- zipCode: string (nullable = true)
```
final code
```
import org.apache.spark.sql.{Column, DataFrame}
import com.phenom.messagingv2.common.Application
import com.phenom.messagingv2.utils.SparkUtils
import org.apache.commons.lang3.ClassUtils.getCanonicalName
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
import org.apache.hudi.common.model.{HoodieAvroPayload,
HoodieFileFormat, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import java.util
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator,
SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, hash, lit}
import org.apache.hudi.QuickstartUtils._
def renameColumnsWithIndex(df: DataFrame): DataFrame = {
var df2 = df;
val newColumnNames = (0 to df.columns.length - 1).map(i =>
s"_index_$i")
println(newColumnNames)
df.columns.zip(newColumnNames).map {
case (oldName, newName) =>
if("cdc_pk".equals(oldName) || oldName.contains("_id") ||
oldName.contains("oid") || oldName.contains("addressLogs") ) {
// do nothing
}else{
println(oldName+" -> "+ newName);
df2 = df2.withColumnRenamed(oldName, newName)
}
}.foldLeft(df2)((acc, df2) => acc)
}
val sess = spark;
val snapshotDf =
sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
snapshotDf.cache()
snapshotDf.registerTempTable("snapshot")
// 472 is the culprit
val snapshotDf2 = renameColumnsWithIndex(sess.sql("select * from
snapshot order by _id.oid limit 472 ") )
snapshotDf2.registerTempTable("snapshot2")
// val snapshotDf2 = sess.sql("select _id, cdc_pk,eventId,
additionalConfig, additionalFields, additionalRequestInfo, address,
addressLogs, alertData,alertUrl,applySources,applyTransactionId,atsId,
experience from snapshot ")
val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs from
snapshot2 ")
snapshotDf3.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
.option(HoodieWriteConfig.TABLE_NAME, "GE11")
.mode(SaveMode.Overwrite)
.save("s3://buket/snapshots-hudi/ge11-drop/snapshot");
```
exception
```
07-18 10:24:07 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT
failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
... 53 elided
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]