[
https://issues.apache.org/jira/browse/SPARK-46679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andoni Teso updated SPARK-46679:
--------------------------------
Description:
Since version 3.4, I've been experiencing the following error when using
encoders.
{code:java}
Exception in thread "main" java.util.NoSuchElementException: key not found: T
at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
at org.example.Main.main(Main.java:26) {code}
I'm attaching the code I use to reproduce the error locally. [^spark_test.zip]
The issue is in the JavaTypeInference class when it tries to find the encoder
for a ParameterizedType with the value Team<T>. When running
JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T again,
but this time as a Company object, and pt.getRawType as Team. This ends up
generating a tuple of Team, Company in the typeVariables map, leading to errors
when searching for TypeVariables.
My example code is this:
{code:java}
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("Spark Test")
.getOrCreate();
Dataset<PersonData> df = spark.read()
.option("header", "true")
.option("delimiter", ",")
.csv("src/main/resources/data/person-data.csv")
.as(Encoders.bean(PersonData.class));
Dataset<CompanyWrapper> companyWrapperDataset =
df.map((MapFunction<PersonData, CompanyWrapper>) personData -> {
Team<PersonData> team = new Team<>("TEAM NAME", personData);
return new CompanyWrapper("COMPANY NAME", team);
}, Encoders.bean(CompanyWrapper.class));
companyWrapperDataset.show(false);
}
} {code}
{code:java}
public class CompanyWrapper extends Company<PersonData> {
public CompanyWrapper() {
}
public CompanyWrapper(String name, Team<PersonData> team) {
super(name, team);
}
} {code}
{code:java}
public class Company<T> {
private String name;
private Team<T> team;
public Company() {
}
public Company(String name, Team<T> team) {
this.name = name;
this.team = team;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Team<T> getTeam() {
return team;
}
public void setTeam(Team<T> team) {
this.team = team;
}
@Override
public String toString() {
return "Company{" +
"name='" + name + '\'' +
", team=" + team +
'}';
}
}
{code}
{code:java}
public class Team<T> {
public String name;
public T person;
public Team() {
}
public Team(String name, T person) {
this.name = name;
this.person = person;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public T getPerson() {
return person;
}
public void setPerson(T person) {
this.person = person;
}
@Override
public String toString() {
return "Team{" +
"name='" + name + '\'' +
", person=" + person +
'}';
}
} {code}
{code:java}
public class PersonData implements Serializable {
private String id;
private String firstName;
private String lastName;
private String email;
private String phone;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
@Override
public String toString() {
return "PersonData{" +
"id='" + id + '\'' +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", email='" + email + '\'' +
", phone='" + phone + '\'' +
'}';
}
} {code}
was:
Since version 3.4, I've been experiencing the following error when using
encoders.
{code:java}
Exception in thread "main" java.util.NoSuchElementException: key not found: T
at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
at org.example.Main.main(Main.java:26) {code}
I'm attaching the code I use to reproduce the error locally. [^spark_test.zip]
The issue is in the JavaTypeInference class when it tries to find the encoder
for a ParameterizedType with the value Team<T>. When running
JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T again,
but this time as a Company object, and pt.getRawType as Team. This ends up
generating a tuple of Team, Company in the typeVariables map, leading to errors
when searching for TypeVariables.
In my case, I've resolved this by doing the following:
{code:java}
case tv: TypeVariable[_] =>
encoderFor(typeVariables.head._2, seenTypeSet, typeVariables)
case pt: ParameterizedType =>
encoderFor(pt.getRawType, seenTypeSet, typeVariables) {code}
I haven't submitted a pull request because it doesn't seem to be the most
optimal solution, or it might break some parts of the code. Additional
validations or conditions may need to be added.
> Encoders with multiple inheritance - Key not found: T
> -----------------------------------------------------
>
> Key: SPARK-46679
> URL: https://issues.apache.org/jira/browse/SPARK-46679
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.2, 3.5.0, 4.0.0
> Reporter: Andoni Teso
> Priority: Critical
> Attachments: spark_test.zip
>
>
> Since version 3.4, I've been experiencing the following error when using
> encoders.
> {code:java}
> Exception in thread "main" java.util.NoSuchElementException: key not found: T
> at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
> at
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
> at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
> at org.apache.spark.sql.Encoders.bean(Encoders.scala)
> at org.example.Main.main(Main.java:26) {code}
> I'm attaching the code I use to reproduce the error locally.
> [^spark_test.zip]
> The issue is in the JavaTypeInference class when it tries to find the encoder
> for a ParameterizedType with the value Team<T>. When running
> JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T
> again, but this time as a Company object, and pt.getRawType as Team. This
> ends up generating a tuple of Team, Company in the typeVariables map, leading
> to errors when searching for TypeVariables.
> My example code is this:
> {code:java}
> public class Main {
> public static void main(String[] args) {
> SparkSession spark = SparkSession.builder()
> .master("local[*]")
> .appName("Spark Test")
> .getOrCreate();
> Dataset<PersonData> df = spark.read()
> .option("header", "true")
> .option("delimiter", ",")
> .csv("src/main/resources/data/person-data.csv")
> .as(Encoders.bean(PersonData.class));
> Dataset<CompanyWrapper> companyWrapperDataset =
> df.map((MapFunction<PersonData, CompanyWrapper>) personData -> {
> Team<PersonData> team = new Team<>("TEAM NAME", personData);
> return new CompanyWrapper("COMPANY NAME", team);
> }, Encoders.bean(CompanyWrapper.class));
> companyWrapperDataset.show(false);
> }
> } {code}
>
> {code:java}
> public class CompanyWrapper extends Company<PersonData> {
> public CompanyWrapper() {
> }
> public CompanyWrapper(String name, Team<PersonData> team) {
> super(name, team);
> }
> } {code}
>
> {code:java}
> public class Company<T> {
> private String name;
> private Team<T> team;
> public Company() {
> }
> public Company(String name, Team<T> team) {
> this.name = name;
> this.team = team;
> }
> public String getName() {
> return name;
> }
> public void setName(String name) {
> this.name = name;
> }
> public Team<T> getTeam() {
> return team;
> }
> public void setTeam(Team<T> team) {
> this.team = team;
> }
> @Override
> public String toString() {
> return "Company{" +
> "name='" + name + '\'' +
> ", team=" + team +
> '}';
> }
> }
> {code}
> {code:java}
> public class Team<T> {
> public String name;
> public T person;
> public Team() {
> }
> public Team(String name, T person) {
> this.name = name;
> this.person = person;
> }
> public String getName() {
> return name;
> }
> public void setName(String name) {
> this.name = name;
> }
> public T getPerson() {
> return person;
> }
> public void setPerson(T person) {
> this.person = person;
> }
> @Override
> public String toString() {
> return "Team{" +
> "name='" + name + '\'' +
> ", person=" + person +
> '}';
> }
> } {code}
> {code:java}
> public class PersonData implements Serializable {
> private String id;
> private String firstName;
> private String lastName;
> private String email;
> private String phone;
> public String getId() {
> return id;
> }
> public void setId(String id) {
> this.id = id;
> }
> public String getFirstName() {
> return firstName;
> }
> public void setFirstName(String firstName) {
> this.firstName = firstName;
> }
> public String getLastName() {
> return lastName;
> }
> public void setLastName(String lastName) {
> this.lastName = lastName;
> }
> public String getEmail() {
> return email;
> }
> public void setEmail(String email) {
> this.email = email;
> }
> public String getPhone() {
> return phone;
> }
> public void setPhone(String phone) {
> this.phone = phone;
> }
> @Override
> public String toString() {
> return "PersonData{" +
> "id='" + id + '\'' +
> ", firstName='" + firstName + '\'' +
> ", lastName='" + lastName + '\'' +
> ", email='" + email + '\'' +
> ", phone='" + phone + '\'' +
> '}';
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]