[ https://issues.apache.org/jira/browse/SPARK-27339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17609958#comment-17609958 ]
sam commented on SPARK-27339: ----------------------------- [~hyukjin.kwon][~wrschneider99] [~ksbalas]. We are working on a work around by defining our own `Encoder` which we use at the call site `.as`. We generate the `Encoder` using the `StructType` which knows the correct precision and scales (see below). Regarding how I'd like to see this fixed in Spark: - Provide an `as` method that takes a `StructType` plus a `T` - Or an `ExpressionEncoder.apply` method that takes a `StructType` plus a `T` CODE NOT FULLY TESTED YET {code:java} object Thing { import scala.reflect.runtime.universe.TypeTag def encoderWithSchema[T: ClassTag: TypeTag](schema: StructType): ExpressionEncoder[T] = Thing.correctType[T](ExpressionEncoder[T], schema) def correctType[T](encoder: ExpressionEncoder[T], schema: StructType): ExpressionEncoder[T] = { val ExpressionEncoder(_, flat, serializer, deserializer, clsTag) = encoder ExpressionEncoder( schema = schema, flat = flat, serializer = serializer.zip(schema).map { case (fieldExpression, structField) => correctType(fieldExpression, structField) }, deserializer = deserializer match { case newInstance: NewInstance => newInstance.copy(arguments = newInstance.arguments.zip(schema).map { case (fieldExpression, structField) => correctType(fieldExpression, structField) }) case other => other }, clsTag = clsTag ) } def correctType(expression: Expression, field: StructField): Expression = expression match { case alias @ Alias(child, name) => Alias(correctType(child, field), name)(alias.exprId, alias.qualifier, alias.explicitMetadata) case UnwrapOption(dataType, child) => UnwrapOption(dataType, correctType(child, field)) case WrapOption(child, dataType) => WrapOption(correctType(child, field), dataType) case AssertNotNull(child, walkedTypePath) => AssertNotNull(correctType(child, field), walkedTypePath) case UpCast(child, dataType, walkedTypePath) => { UpCast(child, field.dataType, walkedTypePath) } case UnresolvedMapObjects(fn, child, customCollectionCls) => UnresolvedMapObjects(arg => correctType(fn(arg), field), child, customCollectionCls) case If(predicate, trueValue, falseValue) => If(predicate, correctType(trueValue, field), correctType(falseValue, field)) case newInstance: NewInstance => { field.dataType match { case StructType(fields) => { newInstance.copy(arguments = newInstance.arguments.zip(fields).map { case (fieldExpression, structField) => correctType(fieldExpression, structField) }) } case ArrayType(StructType(fields), _) => newInstance.copy(arguments = newInstance.arguments.zip(fields).map { case (fieldExpression, structField) => correctType(fieldExpression, structField) }) case _ => newInstance } } case invoke: Invoke => invoke.copy(targetObject = correctType(invoke.targetObject, field)) case staticInvoke: StaticInvoke => staticInvoke//.copy(dataType = field.dataType) case other => other } } {code} > Decimal up cast to higher scale fails while reading parquet to Dataset > ---------------------------------------------------------------------- > > Key: SPARK-27339 > URL: https://issues.apache.org/jira/browse/SPARK-27339 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0, 2.4.0 > Reporter: Bill Schneider > Priority: Major > > Given a parquet file with a decimal (38,4) field. One can read it into a > dataframe but fails to read/cast it to a dataset using a case class with > BigDecimal field. > {code:java} > import org.apache.spark.sql.{SaveMode, SparkSession} > object ReproduceSparkDecimalBug extends App{ > case class SimpleDecimal(value: BigDecimal) > val path = "/tmp/sparkTest" > val spark = SparkSession.builder().master("local").getOrCreate() > import spark.implicits._ > spark > .sql("SELECT CAST(10.12345 AS DECIMAL(38,4)) AS value ") > .write > .mode(SaveMode.Overwrite) > .parquet(path) > // works fine and the dataframe will have a decimal(38,4) > val df = spark.read.parquet(path) > df.printSchema() > df.show(1) > // will fail -> org.apache.spark.sql.AnalysisException: Cannot up cast > `value` from decimal(38,4) to decimal(38,18) as it may truncate > // 1. Why Spark sees scala BigDecimal as fixed (38,18)? > // 2. Up casting to higher scale should be allowed anyway > val ds = df.as[SimpleDecimal] > ds.printSchema() > spark.close() > } > {code} > {code:java} > org.apache.spark.sql.AnalysisException: Cannot up cast `value` from > decimal(38,4) to decimal(38,18) as it may truncate > The type path of the target object is: > - field (class: "scala.math.BigDecimal", name: "value") > - root class: "ReproduceSparkDecimalBug.SimpleDecimal" > You can either add an explicit cast to the input data or choose a higher > precision type of the field in the target object; > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2366) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2382) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2377) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35.applyOrElse(Analyzer.scala:2377) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35.applyOrElse(Analyzer.scala:2373) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2373) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2360) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:255) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:206) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170) > at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61) > at org.apache.spark.sql.Dataset.as(Dataset.scala:380) > at > ReproduceSparkDecimalBug$.delayedEndpoint$ReproduceSparkDecimalBug$1(ReproduceSparkDecimalBug.scala:27) > at > ReproduceSparkDecimalBug$delayedInit$body.apply(ReproduceSparkDecimalBug.scala:4) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at ReproduceSparkDecimalBug$.main(ReproduceSparkDecimalBug.scala:4) > at ReproduceSparkDecimalBug.main(ReproduceSparkDecimalBug.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org