chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1639955458

   To eliminate the column case sensitiveness issue I renamed all columns to 
static strings with index-es --Except for ` _id.oid` , `cdc_pk` , and 
`addressLog` . Also, I printed the schema of `addressLog` and made sure none of 
the columns repeat.  Yet, again the same excption.
   
   NOTE: note that I am running on AWS EMR with only ganglia, Spark selected 
(no hive/ glue/ hudi are selected)
   
   
   addressLog column schema
   ```
   |-- addressLogs: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- addressLines: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- city: string (nullable = true)
    |    |    |-- country: string (nullable = true)
    |    |    |-- createdDate: string (nullable = true)
    |    |    |-- fieldId: string (nullable = true)
    |    |    |-- isDerived: boolean (nullable = true)
    |    |    |-- latLong: string (nullable = true)
    |    |    |-- location: string (nullable = true)
    |    |    |-- locationIp: struct (nullable = true)
    |    |    |    |-- city: string (nullable = true)
    |    |    |    |-- continentCode: string (nullable = true)
    |    |    |    |-- continentName: string (nullable = true)
    |    |    |    |-- country: string (nullable = true)
    |    |    |    |-- countryIsoCode: string (nullable = true)
    |    |    |    |-- latitude: string (nullable = true)
    |    |    |    |-- longitude: string (nullable = true)
    |    |    |    |-- postalCode: string (nullable = true)
    |    |    |    |-- registeredCountry: string (nullable = true)
    |    |    |    |-- registeredCountryIsoCode: string (nullable = true)
    |    |    |    |-- subDivisions: string (nullable = true)
    |    |    |    |-- subDivisionsIsoCode: string (nullable = true)
    |    |    |    |-- timeZone: string (nullable = true)
    |    |    |-- original: struct (nullable = true)
    |    |    |    |-- city: string (nullable = true)
    |    |    |    |-- country: string (nullable = true)
    |    |    |    |-- location: string (nullable = true)
    |    |    |    |-- state: string (nullable = true)
    |    |    |-- residentialType: string (nullable = true)
    |    |    |-- source: string (nullable = true)
    |    |    |-- sourceType: string (nullable = true)
    |    |    |-- standardized: boolean (nullable = true)
    |    |    |-- standardizedDate: string (nullable = true)
    |    |    |-- state: string (nullable = true)
    |    |    |-- updatedDate: string (nullable = true)
    |    |    |-- zipCode: string (nullable = true)
   ```
   
   final code
   
   ```
       import org.apache.spark.sql.{Column, DataFrame}
       import com.phenom.messagingv2.common.Application
       import com.phenom.messagingv2.utils.SparkUtils
       import org.apache.commons.lang3.ClassUtils.getCanonicalName
       import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
       import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
       import org.apache.hudi.common.model.{HoodieAvroPayload, 
HoodieFileFormat, WriteOperationType}
       import org.apache.hudi.common.table.HoodieTableConfig
       import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
       import org.apache.hudi.keygen.constant.KeyGeneratorOptions
   
       import java.util
       import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
       import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
       import org.apache.spark.sql.SaveMode
       import org.apache.spark.sql.functions.{col, hash, lit}
       import org.apache.hudi.QuickstartUtils._
   
   
       def renameColumnsWithIndex(df: DataFrame): DataFrame = {
         var df2 = df;
         val newColumnNames = (0 to df.columns.length - 1).map(i => 
s"_index_$i")
         println(newColumnNames)
         df.columns.zip(newColumnNames).map { 
           case (oldName, newName) => 
             if("cdc_pk".equals(oldName) || oldName.contains("_id") ||  
oldName.contains("oid") || oldName.contains("addressLogs")  ) { 
               // do nothing  
             }else{ 
               println(oldName+" -> "+ newName); 
               df2 = df2.withColumnRenamed(oldName, newName) 
             }
         }.foldLeft(df2)((acc, df2) => acc)
       }
   
   
       val sess = spark;
       val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
       snapshotDf.cache()
       snapshotDf.registerTempTable("snapshot")
   
       // 472 is the culprit
       val snapshotDf2 = renameColumnsWithIndex(sess.sql("select * from 
snapshot order by _id.oid limit 472 ")  ) 
       snapshotDf2.registerTempTable("snapshot2") 
       // val snapshotDf2 = sess.sql("select _id, cdc_pk,eventId, 
additionalConfig, additionalFields, additionalRequestInfo, address, 
addressLogs, alertData,alertUrl,applySources,applyTransactionId,atsId, 
experience from snapshot ")
       val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs  from 
snapshot2 ")
       snapshotDf3.write.format("hudi")
         .options(getQuickstartWriteConfigs)
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
         .option(HoodieWriteConfig.TABLE_NAME, "GE11")
         .mode(SaveMode.Overwrite)
         .save("s3://buket/snapshots-hudi/ge11-drop/snapshot");
   
   ```
   
   exception
   ```
   07-18 10:24:07 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT 
failed with errors
   org.apache.hudi.exception.HoodieException: Write to Hudi failed
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
     at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
     at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
     at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
     at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
     at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
     at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
     at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
     at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
     at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
     at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
     at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
     ... 53 elided
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to