[ 
https://issues.apache.org/jira/browse/SPARK-46679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andoni Teso updated SPARK-46679:
--------------------------------
    Description: 
Since version 3.4, I've been experiencing the following error when using 
encoders.
{code:java}
Exception in thread "main" java.util.NoSuchElementException: key not found: T
    at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
    at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
    at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
    at org.apache.spark.sql.Encoders.bean(Encoders.scala)
    at org.example.Main.main(Main.java:26) {code}
I'm attaching the code I use to reproduce the error locally.  [^spark_test.zip]

The issue is in the JavaTypeInference class when it tries to find the encoder 
for a ParameterizedType with the value Team<T>. When running 
JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T again, 
but this time as a Company object, and pt.getRawType as Team. This ends up 
generating a tuple of Team, Company in the typeVariables map, leading to errors 
when searching for TypeVariables.

My example code is this:
{code:java}
public class Main {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("Spark Test")
                .getOrCreate();

        Dataset<PersonData> df = spark.read()
                .option("header", "true")
                .option("delimiter", ",")
                .csv("src/main/resources/data/person-data.csv")
                .as(Encoders.bean(PersonData.class));

        Dataset<CompanyWrapper> companyWrapperDataset = 
df.map((MapFunction<PersonData, CompanyWrapper>) personData -> {
            Team<PersonData> team = new Team<>("TEAM NAME", personData);
            return new CompanyWrapper("COMPANY NAME", team);
        }, Encoders.bean(CompanyWrapper.class));

        companyWrapperDataset.show(false);

    }
} {code}
 
{code:java}
public class CompanyWrapper extends Company<PersonData> {

    public CompanyWrapper() {
    }

    public CompanyWrapper(String name, Team<PersonData> team) {
        super(name, team);
    }

} {code}
 
{code:java}
public class Company<T> {

    private String name;
    private Team<T> team;

    public Company() {
    }

    public Company(String name, Team<T> team) {
        this.name = name;
        this.team = team;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Team<T> getTeam() {
        return team;
    }

    public void setTeam(Team<T> team) {
        this.team = team;
    }

    @Override
    public String toString() {
        return "Company{" +
                "name='" + name + '\'' +
                ", team=" + team +
                '}';
    }
}
 {code}
{code:java}
public class Team<T> {

    public String name;
    public T person;

    public Team() {
    }

    public Team(String name, T person) {
        this.name = name;
        this.person = person;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public T getPerson() {
        return person;
    }

    public void setPerson(T person) {
        this.person = person;
    }

    @Override
    public String toString() {
        return "Team{" +
                "name='" + name + '\'' +
                ", person=" + person +
                '}';
    }
} {code}
{code:java}
public class PersonData implements Serializable {

    private String id;

    private String firstName;

    private String lastName;

    private String email;

    private String phone;

    public String getId() {
       return id;
    }

    public void setId(String id) {
       this.id = id;
    }

    public String getFirstName() {
       return firstName;
    }

    public void setFirstName(String firstName) {
       this.firstName = firstName;
    }

    public String getLastName() {
       return lastName;
    }

    public void setLastName(String lastName) {
       this.lastName = lastName;
    }

    public String getEmail() {
       return email;
    }

    public void setEmail(String email) {
       this.email = email;
    }

    public String getPhone() {
       return phone;
    }

    public void setPhone(String phone) {
       this.phone = phone;
    }

    @Override
    public String toString() {
       return "PersonData{" +
             "id='" + id + '\'' +
             ", firstName='" + firstName + '\'' +
             ", lastName='" + lastName + '\'' +
             ", email='" + email + '\'' +
             ", phone='" + phone + '\'' +
             '}';
    }
} {code}
 

  was:
Since version 3.4, I've been experiencing the following error when using 
encoders.
{code:java}
Exception in thread "main" java.util.NoSuchElementException: key not found: T
    at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
    at 
org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
    at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
    at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
    at org.apache.spark.sql.Encoders.bean(Encoders.scala)
    at org.example.Main.main(Main.java:26) {code}
I'm attaching the code I use to reproduce the error locally.  [^spark_test.zip]

The issue is in the JavaTypeInference class when it tries to find the encoder 
for a ParameterizedType with the value Team<T>. When running 
JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T again, 
but this time as a Company object, and pt.getRawType as Team. This ends up 
generating a tuple of Team, Company in the typeVariables map, leading to errors 
when searching for TypeVariables.

In my case, I've resolved this by doing the following:
{code:java}
case tv: TypeVariable[_] =>
  encoderFor(typeVariables.head._2, seenTypeSet, typeVariables)

case pt: ParameterizedType =>
  encoderFor(pt.getRawType, seenTypeSet, typeVariables) {code}
I haven't submitted a pull request because it doesn't seem to be the most 
optimal solution, or it might break some parts of the code. Additional 
validations or conditions may need to be added.


> Encoders with multiple inheritance - Key not found: T
> -----------------------------------------------------
>
>                 Key: SPARK-46679
>                 URL: https://issues.apache.org/jira/browse/SPARK-46679
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.2, 3.5.0, 4.0.0
>            Reporter: Andoni Teso
>            Priority: Critical
>         Attachments: spark_test.zip
>
>
> Since version 3.4, I've been experiencing the following error when using 
> encoders.
> {code:java}
> Exception in thread "main" java.util.NoSuchElementException: key not found: T
>     at scala.collection.immutable.Map$Map1.apply(Map.scala:163)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:121)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.$anonfun$encoderFor$1(JavaTypeInference.scala:140)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:138)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:60)
>     at 
> org.apache.spark.sql.catalyst.JavaTypeInference$.encoderFor(JavaTypeInference.scala:53)
>     at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:62)
>     at org.apache.spark.sql.Encoders$.bean(Encoders.scala:179)
>     at org.apache.spark.sql.Encoders.bean(Encoders.scala)
>     at org.example.Main.main(Main.java:26) {code}
> I'm attaching the code I use to reproduce the error locally.  
> [^spark_test.zip]
> The issue is in the JavaTypeInference class when it tries to find the encoder 
> for a ParameterizedType with the value Team<T>. When running 
> JavaTypeUtils.getTypeArguments(pt).asScala.toMap, it returns the type T 
> again, but this time as a Company object, and pt.getRawType as Team. This 
> ends up generating a tuple of Team, Company in the typeVariables map, leading 
> to errors when searching for TypeVariables.
> My example code is this:
> {code:java}
> public class Main {
>     public static void main(String[] args) {
>         SparkSession spark = SparkSession.builder()
>                 .master("local[*]")
>                 .appName("Spark Test")
>                 .getOrCreate();
>         Dataset<PersonData> df = spark.read()
>                 .option("header", "true")
>                 .option("delimiter", ",")
>                 .csv("src/main/resources/data/person-data.csv")
>                 .as(Encoders.bean(PersonData.class));
>         Dataset<CompanyWrapper> companyWrapperDataset = 
> df.map((MapFunction<PersonData, CompanyWrapper>) personData -> {
>             Team<PersonData> team = new Team<>("TEAM NAME", personData);
>             return new CompanyWrapper("COMPANY NAME", team);
>         }, Encoders.bean(CompanyWrapper.class));
>         companyWrapperDataset.show(false);
>     }
> } {code}
>  
> {code:java}
> public class CompanyWrapper extends Company<PersonData> {
>     public CompanyWrapper() {
>     }
>     public CompanyWrapper(String name, Team<PersonData> team) {
>         super(name, team);
>     }
> } {code}
>  
> {code:java}
> public class Company<T> {
>     private String name;
>     private Team<T> team;
>     public Company() {
>     }
>     public Company(String name, Team<T> team) {
>         this.name = name;
>         this.team = team;
>     }
>     public String getName() {
>         return name;
>     }
>     public void setName(String name) {
>         this.name = name;
>     }
>     public Team<T> getTeam() {
>         return team;
>     }
>     public void setTeam(Team<T> team) {
>         this.team = team;
>     }
>     @Override
>     public String toString() {
>         return "Company{" +
>                 "name='" + name + '\'' +
>                 ", team=" + team +
>                 '}';
>     }
> }
>  {code}
> {code:java}
> public class Team<T> {
>     public String name;
>     public T person;
>     public Team() {
>     }
>     public Team(String name, T person) {
>         this.name = name;
>         this.person = person;
>     }
>     public String getName() {
>         return name;
>     }
>     public void setName(String name) {
>         this.name = name;
>     }
>     public T getPerson() {
>         return person;
>     }
>     public void setPerson(T person) {
>         this.person = person;
>     }
>     @Override
>     public String toString() {
>         return "Team{" +
>                 "name='" + name + '\'' +
>                 ", person=" + person +
>                 '}';
>     }
> } {code}
> {code:java}
> public class PersonData implements Serializable {
>     private String id;
>     private String firstName;
>     private String lastName;
>     private String email;
>     private String phone;
>     public String getId() {
>        return id;
>     }
>     public void setId(String id) {
>        this.id = id;
>     }
>     public String getFirstName() {
>        return firstName;
>     }
>     public void setFirstName(String firstName) {
>        this.firstName = firstName;
>     }
>     public String getLastName() {
>        return lastName;
>     }
>     public void setLastName(String lastName) {
>        this.lastName = lastName;
>     }
>     public String getEmail() {
>        return email;
>     }
>     public void setEmail(String email) {
>        this.email = email;
>     }
>     public String getPhone() {
>        return phone;
>     }
>     public void setPhone(String phone) {
>        this.phone = phone;
>     }
>     @Override
>     public String toString() {
>        return "PersonData{" +
>              "id='" + id + '\'' +
>              ", firstName='" + firstName + '\'' +
>              ", lastName='" + lastName + '\'' +
>              ", email='" + email + '\'' +
>              ", phone='" + phone + '\'' +
>              '}';
>     }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to