comphead commented on issue #1289:
URL: 
https://github.com/apache/datafusion-comet/issues/1289#issuecomment-2701540775

   Yeah, looks like the problem is in the shuffle
   
   Manager to create a local test 
   
   ```
   class CometNativeReaderSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     case class FullName(first: String, middle: String, last: String)
     case class Company(name: String, address: String)
     case class Employer(id: Int, company: Company)
     case class Contact(
         id: Int,
         name: FullName,
         address: String,
         pets: Int,
         friends: Array[FullName] = Array.empty,
         relatives: Map[String, FullName] = Map.empty,
         employer: Employer = null,
         relations: Map[FullName, String] = Map.empty)
     case class Department(depId: Int, depName: String, contactId: Int, 
employer: Employer)
   
     override protected def sparkConf: SparkConf =
       super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false")
   
     case class Employee(id: Int, name: FullName, employer: Company)
   
     val janeDoe: FullName = FullName("Jane", "X.", "Doe")
     val johnDoe: FullName = FullName("John", "Y.", "Doe")
     val susanSmith: FullName = FullName("Susan", "Z.", "Smith")
   
     val company: Company = Company("abc", "123 Business Street")
   
     val employer: Employer = Employer(0, company)
     val employerWithNullCompany: Employer = Employer(1, null)
     val employerWithNullCompany2: Employer = Employer(2, null)
   
     val contacts =
       Contact(
         0,
         janeDoe,
         "123 Main Street",
         1,
         friends = Array(susanSmith),
         relatives = Map("brother" -> johnDoe),
         employer = employer,
         relations = Map(johnDoe -> "brother")) ::
         Contact(
           1,
           johnDoe,
           "321 Wall Street",
           3,
           relatives = Map("sister" -> janeDoe),
           employer = employerWithNullCompany,
           relations = Map(janeDoe -> "sister")) :: Nil
   
     val departments =
       Department(0, "Engineering", 0, employer) ::
         Department(1, "Marketing", 1, employerWithNullCompany) ::
         Department(2, "Operation", 4, employerWithNullCompany2) :: Nil
   
     val employees = Employee(0, janeDoe, company) :: Employee(1, johnDoe, 
company) :: Nil
   
     case class Name(first: String, last: String)
     case class BriefContact(id: Int, name: Name, address: String)
   
     private val briefContacts =
       BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
         BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
   
     protected def testSchemaPruning(testName: String)(testThunk: => Unit): 
Unit = {
       test(s"Non-vectorized reader - without partition data column - 
$testName") {
         withSQLConf("spark.sql.parquet.enableVectorizedReader" -> "false") {
           withContacts(testThunk)
         }
       }
     }
   
     testSchemaPruning("select a single complex field array and in clause") {
       val query = sql("select friends.middle from contacts where 
friends.first[0] = 'Susan'")
   
       checkAnswer(query.orderBy("id"), Row(Array("Z.")) :: Nil)
     }
   
     protected def makeDataSourceFile[T <: Product: ClassTag: TypeTag](
         data: Seq[T],
         path: File): Unit = {
       spark
         .createDataFrame(data)
         .write
         .mode(SaveMode.Overwrite)
         .format("parquet")
         .save(path.getCanonicalPath)
     }
   
     private def withContacts(testThunk: => Unit): Unit = {
       withTempPath { dir =>
         val path = dir.getCanonicalPath
   
         makeDataSourceFile(contacts, new File(path + "/contacts/p=1"))
         makeDataSourceFile(briefContacts, new File(path + "/contacts/p=2"))
         makeDataSourceFile(departments, new File(path + "/departments"))
   
         // Providing user specified schema. Inferred schema from different 
data sources might
         // be different.
         val schema = "`id` INT,`name` STRUCT<`first`: STRING, `middle`: 
STRING, `last`: STRING>, " +
           "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, 
`middle`: STRING, " +
           "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, 
`middle`: STRING, " +
           "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: 
STRUCT<`name`: STRING, " +
           "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, 
`middle`: STRING, " +
           "`last`: STRING>,STRING>,`p` INT"
         spark.read
           .format("parquet")
           .schema(schema)
           .load(path + "/contacts")
           .createOrReplaceTempView("contacts")
   
         val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT, 
" +
           "`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, 
`address`: STRING>>"
         spark.read
           .format("parquet")
           .schema(departmentSchema)
           .load(path + "/departments")
           .createOrReplaceTempView("departments")
   
         testThunk
       }
     }
   }
   ```
   
   Fails when the native shuffle enabled


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to