[ 
https://issues.apache.org/jira/browse/SPARK-27339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17609958#comment-17609958
 ] 

sam commented on SPARK-27339:
-----------------------------

[~hyukjin.kwon][~wrschneider99] [~ksbalas]. We are working on a work around by 
defining our own `Encoder` which we use at the call site `.as`.  We generate 
the `Encoder` using the `StructType` which knows the correct precision and 
scales (see below).

Regarding how I'd like to see this fixed in Spark:

 - Provide an `as` method that takes a `StructType` plus a `T`
 - Or an `ExpressionEncoder.apply`  method that takes a `StructType` plus a `T`



CODE NOT FULLY TESTED YET

{code:java}
object Thing {
  import scala.reflect.runtime.universe.TypeTag

  def encoderWithSchema[T: ClassTag: TypeTag](schema: StructType): 
ExpressionEncoder[T] =
    Thing.correctType[T](ExpressionEncoder[T], schema)

  def correctType[T](encoder: ExpressionEncoder[T], schema: StructType): 
ExpressionEncoder[T] = {
    val ExpressionEncoder(_, flat, serializer, deserializer, clsTag) = encoder

    ExpressionEncoder(
      schema = schema,
      flat = flat,
      serializer = serializer.zip(schema).map {
        case (fieldExpression, structField) => correctType(fieldExpression, 
structField)
      },
      deserializer = deserializer match {
        case newInstance: NewInstance => newInstance.copy(arguments = 
newInstance.arguments.zip(schema).map {
          case (fieldExpression, structField) => correctType(fieldExpression, 
structField)
        })
        case other => other
      },
      clsTag = clsTag
    )
  }

  def correctType(expression: Expression, field: StructField): Expression = 
expression match {
    case alias @ Alias(child, name) => Alias(correctType(child, field), 
name)(alias.exprId, alias.qualifier, alias.explicitMetadata)
    case UnwrapOption(dataType, child) => UnwrapOption(dataType, 
correctType(child, field))
    case WrapOption(child, dataType) => WrapOption(correctType(child, field), 
dataType)
    case AssertNotNull(child, walkedTypePath) => 
AssertNotNull(correctType(child, field), walkedTypePath)


    case UpCast(child, dataType, walkedTypePath) => {
      UpCast(child, field.dataType, walkedTypePath)
    }

    case UnresolvedMapObjects(fn, child, customCollectionCls) =>
      UnresolvedMapObjects(arg => correctType(fn(arg), field), child, 
customCollectionCls)

    case If(predicate, trueValue, falseValue) => If(predicate, 
correctType(trueValue, field), correctType(falseValue, field))

    case newInstance: NewInstance => {
      field.dataType match {
        case StructType(fields) => {
          newInstance.copy(arguments = newInstance.arguments.zip(fields).map {
            case (fieldExpression, structField) => correctType(fieldExpression, 
structField)
          })
        }
        case ArrayType(StructType(fields), _) => newInstance.copy(arguments = 
newInstance.arguments.zip(fields).map {
          case (fieldExpression, structField) => correctType(fieldExpression, 
structField)
        })
        case _ => newInstance
      }
    }

    case invoke: Invoke => invoke.copy(targetObject = 
correctType(invoke.targetObject, field))
    case staticInvoke: StaticInvoke => staticInvoke//.copy(dataType = 
field.dataType)
    case other => other
  }
}

{code}


> Decimal up cast to higher scale fails while reading parquet to Dataset
> ----------------------------------------------------------------------
>
>                 Key: SPARK-27339
>                 URL: https://issues.apache.org/jira/browse/SPARK-27339
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.4.0
>            Reporter: Bill Schneider
>            Priority: Major
>
> Given a parquet file with a decimal (38,4) field. One can read it into a 
> dataframe but fails to read/cast it to a dataset using a case class with 
> BigDecimal field. 
> {code:java}
> import org.apache.spark.sql.{SaveMode, SparkSession}
> object ReproduceSparkDecimalBug extends App{
>   case class SimpleDecimal(value: BigDecimal)
>   val path = "/tmp/sparkTest"
>   val spark = SparkSession.builder().master("local").getOrCreate()
>   import spark.implicits._
>   spark
>     .sql("SELECT CAST(10.12345 AS DECIMAL(38,4)) AS value ")
>     .write
>     .mode(SaveMode.Overwrite)
>     .parquet(path)
>   // works fine and the dataframe will have a decimal(38,4)
>   val df = spark.read.parquet(path)
>   df.printSchema()
>   df.show(1)
>   // will fail -> org.apache.spark.sql.AnalysisException: Cannot up cast 
> `value` from decimal(38,4) to decimal(38,18) as it may truncate
>   // 1. Why Spark sees scala BigDecimal as fixed (38,18)?
>   // 2. Up casting to higher scale should be allowed anyway
>   val ds = df.as[SimpleDecimal]
>   ds.printSchema()
>   spark.close()
> }
> {code}
> {code:java}
> org.apache.spark.sql.AnalysisException: Cannot up cast `value` from 
> decimal(38,4) to decimal(38,18) as it may truncate
> The type path of the target object is:
> - field (class: "scala.math.BigDecimal", name: "value")
> - root class: "ReproduceSparkDecimalBug.SimpleDecimal"
> You can either add an explicit cast to the input data or choose a higher 
> precision type of the field in the target object;
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2366)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2382)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35$$anonfun$applyOrElse$15.applyOrElse(Analyzer.scala:2377)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35.applyOrElse(Analyzer.scala:2377)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$35.applyOrElse(Analyzer.scala:2373)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2373)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2360)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>       at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>       at scala.collection.immutable.List.foldLeft(List.scala:84)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:255)
>       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:206)
>       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
>       at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
>       at org.apache.spark.sql.Dataset.as(Dataset.scala:380)
>       at 
> ReproduceSparkDecimalBug$.delayedEndpoint$ReproduceSparkDecimalBug$1(ReproduceSparkDecimalBug.scala:27)
>       at 
> ReproduceSparkDecimalBug$delayedInit$body.apply(ReproduceSparkDecimalBug.scala:4)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at ReproduceSparkDecimalBug$.main(ReproduceSparkDecimalBug.scala:4)
>       at ReproduceSparkDecimalBug.main(ReproduceSparkDecimalBug.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to