This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ecda35bf53 [GLUTEN-8497][VL] A bad test case that fails columnar table
cache query (#8498)
ecda35bf53 is described below
commit ecda35bf5314c9d15515133309747ebe69f000e5
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Jan 13 10:02:44 2025 +0800
[GLUTEN-8497][VL] A bad test case that fails columnar table cache query
(#8498)
---
.../gluten/execution/VeloxColumnarCacheSuite.scala | 43 ++++++++++++++++++++--
.../columnar/transition/TransitionGraph.scala | 1 +
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
index e9151ad84a..8c7be883bb 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
@@ -24,8 +24,11 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder,
StructType}
import org.apache.spark.storage.StorageLevel
+import scala.collection.JavaConverters._
+
class VeloxColumnarCacheSuite extends VeloxWholeStageTransformerSuite with
AdaptiveSparkPlanHelper {
override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"
@@ -55,7 +58,7 @@ class VeloxColumnarCacheSuite extends
VeloxWholeStageTransformerSuite with Adapt
)
}
- test("input columnar batch") {
+ test("Input columnar batch") {
TPCHTables.map(_.name).foreach {
table =>
runQueryAndCompare(s"SELECT * FROM $table", cache = true) {
@@ -64,7 +67,7 @@ class VeloxColumnarCacheSuite extends
VeloxWholeStageTransformerSuite with Adapt
}
}
- test("input columnar batch and column pruning") {
+ test("Input columnar batch and column pruning") {
val expected = sql("SELECT l_partkey FROM lineitem").collect()
val cached = sql("SELECT * FROM lineitem").cache()
try {
@@ -85,7 +88,7 @@ class VeloxColumnarCacheSuite extends
VeloxWholeStageTransformerSuite with Adapt
}
}
- test("input vanilla Spark columnar batch") {
+ test("Input vanilla Spark columnar batch") {
withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
val df = spark.table("lineitem")
val expected = df.collect()
@@ -98,6 +101,40 @@ class VeloxColumnarCacheSuite extends
VeloxWholeStageTransformerSuite with Adapt
}
}
+ // TODO: Fix this case. See
https://github.com/apache/incubator-gluten/issues/8497.
+ testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar
scan", Some("3.3")) {
+ def withId(id: Int): Metadata =
+ new MetadataBuilder().putLong("parquet.field.id", id).build()
+
+ withTempDir {
+ dir =>
+ val readSchema =
+ new StructType()
+ .add("l_orderkey_read", LongType, true, withId(1))
+ val writeSchema =
+ new StructType()
+ .add("l_orderkey_write", LongType, true, withId(1))
+ withSQLConf("spark.sql.parquet.fieldId.read.enabled" -> "true") {
+ // Write a table with metadata information that Gluten Velox backend
doesn't support,
+ // to emulate the scenario that a Spark columnar scan is not
offload-able so fallen back,
+ // then user tries to cache it.
+ spark
+ .createDataFrame(
+ spark.sql("select l_orderkey from
lineitem").collect().toList.asJava,
+ writeSchema)
+ .write
+ .mode("overwrite")
+ .parquet(dir.getCanonicalPath)
+ val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+ df.cache()
+ // FIXME: The following call will throw since
ColumnarCachedBatchSerializer will be
+ // confused by the input vanilla Parquet scan when its
#convertColumnarBatchToCachedBatch
+ // method is called.
+ assertThrows[Exception](df.collect())
+ }
+ }
+ }
+
test("CachedColumnarBatch serialize and deserialize") {
val df = spark.table("lineitem")
val expected = df.collect()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
index 2733ed9f4f..ef08a34d56 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala
@@ -77,6 +77,7 @@ object TransitionGraph {
}
}
+ // TODO: Consolidate transition graph's cost model with RAS cost model.
private object TransitionCostModel extends
FloydWarshallGraph.CostModel[Transition] {
override def zero(): TransitionCost = TransitionCost(0, Nil)
override def costOf(transition: Transition): TransitionCost = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]