[
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211846#comment-16211846
]
Russell Spitzer commented on SPARK-22316:
-----------------------------------------
You are right, i meant to have createDataset up there. I"ll modify the example.
This shouldn't be Dataset specific since the underlying issue isn't really
dependent on the encoders, it's just that the name that is automatically made
cannot be used to select the column. After all a Dataframe is just a
Dataset[Row] :).
I haven't really looked into this but i'm guessing it's the "(" characters in
the auto-generated column name.
Here is an example with Dataframes for good measure
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85))))
val grouped = ds.groupByKey(c => c.getInt(0)).reduceGroups( (x,y) => x )
grouped(grouped.columns(1))
/**org.apache.spark.sql.AnalysisException: Cannot resolve column name
"ReduceAggregator(org.apache.spark.sql.Row)" among (value,
ReduceAggregator(org.apache.spark.sql.Row));
**/
{code}
> Cannot Select ReducedAggregator Column
> --------------------------------------
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Russell Spitzer
> Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
> org.apache.spark.sql.types.StructType =
> StructType(
> StructField(value,IntegerType,false),
> StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
> StructType(StructField(name,StringType,true),
> StructField(age,IntegerType,false))
> ,true))
> ,true))
> {code}
> The column names are
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve
> '`ReduceAggregator(Customer)`' given input columns: [value,
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338,
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573,
> Some(newInstance(class Customer)), Some(class Customer),
> Some(StructType(StructField(id,IntegerType,false),
> StructField(person,StructType(StructField(name,StringType,true),
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals))
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2,
> true]._2)).id AS id#195, person, if
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2,
> true]._2)).person)) null else named_struct(name, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2,
> true]._2)).person).name, true), age,
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2,
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer,
> true])).person)) null else named_struct(name, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer,
> true])).person).name, true), age,
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer,
> true])).person).age) AS person#196, StructField(id,IntegerType,false),
> StructField(person,StructType(StructField(name,StringType,true),
> StructField(age,IntegerType,false)),true), true, 0, 0) AS
> ReduceAggregator(Customer)#346]
> +- AppendColumns <function1>, class Customer,
> [StructField(id,IntegerType,false),
> StructField(person,StructType(StructField(name,StringType,true),
> StructField(age,IntegerType,false)),true)], newInstance(class Customer),
> [input[0, int, false] AS value#338]
> +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct<id: int, person:
> struct<name: string, age: int>>]
> {code}
> I think that all invocations of
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]