comphead commented on issue #1289:
URL:
https://github.com/apache/datafusion-comet/issues/1289#issuecomment-2701540775
Yeah, looks like the problem is in the shuffle
Manager to create a local test
```
class CometNativeReaderSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
id: Int,
name: FullName,
address: String,
pets: Int,
friends: Array[FullName] = Array.empty,
relatives: Map[String, FullName] = Map.empty,
employer: Employer = null,
relations: Map[FullName, String] = Map.empty)
case class Department(depId: Int, depName: String, contactId: Int,
employer: Employer)
override protected def sparkConf: SparkConf =
super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false")
case class Employee(id: Int, name: FullName, employer: Company)
val janeDoe: FullName = FullName("Jane", "X.", "Doe")
val johnDoe: FullName = FullName("John", "Y.", "Doe")
val susanSmith: FullName = FullName("Susan", "Z.", "Smith")
val company: Company = Company("abc", "123 Business Street")
val employer: Employer = Employer(0, company)
val employerWithNullCompany: Employer = Employer(1, null)
val employerWithNullCompany2: Employer = Employer(2, null)
val contacts =
Contact(
0,
janeDoe,
"123 Main Street",
1,
friends = Array(susanSmith),
relatives = Map("brother" -> johnDoe),
employer = employer,
relations = Map(johnDoe -> "brother")) ::
Contact(
1,
johnDoe,
"321 Wall Street",
3,
relatives = Map("sister" -> janeDoe),
employer = employerWithNullCompany,
relations = Map(janeDoe -> "sister")) :: Nil
val departments =
Department(0, "Engineering", 0, employer) ::
Department(1, "Marketing", 1, employerWithNullCompany) ::
Department(2, "Operation", 4, employerWithNullCompany2) :: Nil
val employees = Employee(0, janeDoe, company) :: Employee(1, johnDoe,
company) :: Nil
case class Name(first: String, last: String)
case class BriefContact(id: Int, name: Name, address: String)
private val briefContacts =
BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
protected def testSchemaPruning(testName: String)(testThunk: => Unit):
Unit = {
test(s"Non-vectorized reader - without partition data column -
$testName") {
withSQLConf("spark.sql.parquet.enableVectorizedReader" -> "false") {
withContacts(testThunk)
}
}
}
testSchemaPruning("select a single complex field array and in clause") {
val query = sql("select friends.middle from contacts where
friends.first[0] = 'Susan'")
checkAnswer(query.orderBy("id"), Row(Array("Z.")) :: Nil)
}
protected def makeDataSourceFile[T <: Product: ClassTag: TypeTag](
data: Seq[T],
path: File): Unit = {
spark
.createDataFrame(data)
.write
.mode(SaveMode.Overwrite)
.format("parquet")
.save(path.getCanonicalPath)
}
private def withContacts(testThunk: => Unit): Unit = {
withTempPath { dir =>
val path = dir.getCanonicalPath
makeDataSourceFile(contacts, new File(path + "/contacts/p=1"))
makeDataSourceFile(briefContacts, new File(path + "/contacts/p=2"))
makeDataSourceFile(departments, new File(path + "/departments"))
// Providing user specified schema. Inferred schema from different
data sources might
// be different.
val schema = "`id` INT,`name` STRUCT<`first`: STRING, `middle`:
STRING, `last`: STRING>, " +
"`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING,
`middle`: STRING, " +
"`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING,
`middle`: STRING, " +
"`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`:
STRUCT<`name`: STRING, " +
"`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING,
`middle`: STRING, " +
"`last`: STRING>,STRING>,`p` INT"
spark.read
.format("parquet")
.schema(schema)
.load(path + "/contacts")
.createOrReplaceTempView("contacts")
val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,
" +
"`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING,
`address`: STRING>>"
spark.read
.format("parquet")
.schema(departmentSchema)
.load(path + "/departments")
.createOrReplaceTempView("departments")
testThunk
}
}
}
```
Fails when the native shuffle enabled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]