[
https://issues.apache.org/jira/browse/SPARK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347302#comment-17347302
]
Umar Asir edited comment on SPARK-35291 at 5/19/21, 5:19 AM:
-------------------------------------------------------------
It's an issue with delta. but null pointer is thrown from
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
Source)"
was (Author: umerasir):
It's an issue with delta.
> NullPointerException at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
> Source)
> --------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-35291
> URL: https://issues.apache.org/jira/browse/SPARK-35291
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 2.3.1, 3.0.2
> Reporter: Umar Asir
> Priority: Major
> Attachments: NotNullIssue.scala, cdwqasourceupdate.7z,
> cdwqatgtupdate.7z, pom.xml, run1.log
>
>
> We are trying to merge data using DeltaTable's merge API. On inserting a null
> value into the not-null column results in NullPointerException instead of
> throwing constrain violation error
> *Code :*
> {code:java}
> package com.uasir.cdw.delta
> import org.apache.spark.sql._
> import io.delta.tables._
> object NotNullIssue {
> def main(args: Array[String]): Unit = {
> System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\")
> val spark = SparkSession
> .builder()
> .appName("DFMergeTest")
> .master("local[*]")
> .config("spark.sql.extensions",
> "io.delta.sql.DeltaSparkSessionExtension")
> .config("spark.sql.catalog.spark_catalog",
> "org.apache.spark.sql.delta.catalog.DeltaCatalog")
> .config("spark.testing.memory", "571859200")
> .getOrCreate()
> println("Reading from the source table")
> val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate")
> \\PFA cdwqasourceupdate.7z
> df.show()
> println("Reading from the target table")
> val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate")
> \\PFA cdwqatgtupdate.7z
> tgtDf.show()
> val sourceTable = "source"
> val targetDataTable = "target"
> val colMap= scala.collection.mutable.Map[String,String]()
> val sourceFields = df.schema.fieldNames
> val targetFields = tgtDf.schema.fieldNames
> for ( i <- 0 until targetFields.length) {
> colMap(targetFields(i)) = sourceTable + "." + sourceFields(i)
> }
> /* colMap will be generated as :
> TGTID -> c1_ID
> TGT_NAME -> c2_NAME
> TGT_ADDRESS -> c3_address
> TGT_DOB -> c4_dob
> */
> println("update")
> DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate")
> .as(targetDataTable)
> .merge(
> df.as(sourceTable),
> targetDataTable + "." + "TGTID" + " = " + sourceTable + "." +
> "c1_ID" )
> .whenMatched()
> .updateExpr(colMap)
> .execute()
> println("Reading from target the table after operation")
> tgtDf.show()
> }
> }
> {code}
> *Error :*
> {code:java}
> Caused by: java.lang.RuntimeException: Error while decoding:
> java.lang.NullPointerExceptionCaused by: java.lang.RuntimeException: Error
> while decoding: java.lang.NullPointerExceptioncreateexternalrow(input[0, int,
> false], input[1, string, false].toString, input[2, string, true].toString,
> staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$,
> ObjectType(class java.sql.Date), toJavaDate, input[3, date, true], true,
> false), StructField(TGTID,IntegerType,false),
> StructField(TGT_NAME,StringType,false),
> StructField(TGT_ADDRESS,StringType,true), StructField(TGT_DOB,DateType,true))
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
> at
> org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$9(MergeIntoCommand.scala:565)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at
> scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
> Source) at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:127) at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown
> Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
> Source) at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
> ... 17 more
> {code}
> *PFA :*
> # Sample program [^NotNullIssue.scala] ,
> # Source data (parquet files) [^cdwqasourceupdate.7z]
> # Target data (parquet files) [^cdwqatgtupdate.7z]
> # Log [^run1.log]
> # [^pom.xml]
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]