This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new fb6e525 [fix](connector) Fix the issue when Spark pushes down in
'case when' (#300)
fb6e525 is described below
commit fb6e525fdc34d6fb8c1552ecdf8b41e4a0863685
Author: aoyuEra <[email protected]>
AuthorDate: Mon Mar 31 22:25:38 2025 +0800
[fix](connector) Fix the issue when Spark pushes down in 'case when' (#300)
---
.../apache/doris/spark/sql/DorisReaderITCase.scala | 39 ++++++++++++++++++++++
.../read/expression/V2ExpressionBuilder.scala | 15 +++++++++
.../read/expression/V2ExpressionBuilder.scala | 15 +++++++++
.../read/expression/V2ExpressionBuilder.scala | 15 +++++++++
4 files changed, 84 insertions(+)
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 07c998b..b5d311f 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -427,4 +427,43 @@ class DorisReaderITCase(readMode: String, flightSqlPort:
Int) extends AbstractCo
assert("List([3])".equals(likeFilter.toList.toString()))
session.stop()
}
+
+
+ @Test
+ def buildCaseWhenTest(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+
+ val resultData = session.sql(
+ """
+ |select * from (
+ | select
+ | id,
+ | (case when c5 > 10 then c2 else null end) as cc1,
+ | (case when c4 < 5 then c3 else null end) as cc2
+ | from test_source where c2 is not null
+ |) where !(cc1 is null and cc2 is null) order by id
+ |""".stripMargin)
+
+ assert("List([1,127,null], [2,null,-32768],
[3,null,0])".equals(resultData.collect().toList.toString()))
+
+ session.stop()
+
+ }
}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f7e08b9..cba20f1 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -57,6 +57,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
case "<=" => s"`${build(e.children()(0))}` <=
${build(e.children()(1))}"
case ">" => s"`${build(e.children()(0))}` >
${build(e.children()(1))}"
case ">=" => s"`${build(e.children()(0))}` >=
${build(e.children()(1))}"
+ case "CASE_WHEN" =>
+ val fragment = new StringBuilder("CASE ")
+ val expressions = e.children()
+
+ for(i<- 0 until expressions.size - 1 by 2){
+ fragment.append(s" WHEN ${build(expressions(i))} THEN
${build(expressions(i+1))} ")
+ }
+
+ if (expressions.length % 2 != 0) {
+ val last = expressions(expressions.length - 1)
+ fragment.append(s" ELSE ${build(last)} ")
+ }
+ fragment.append(" END")
+
+ fragment.mkString
case _ => null
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f13830c..1913a51 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -55,6 +55,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
case "<=" => s"`${build(e.children()(0))}` <=
${build(e.children()(1))}"
case ">" => s"`${build(e.children()(0))}` >
${build(e.children()(1))}"
case ">=" => s"`${build(e.children()(0))}` >=
${build(e.children()(1))}"
+ case "CASE_WHEN" =>
+ val fragment = new StringBuilder("CASE ")
+ val expressions = e.children()
+
+ for(i<- 0 until expressions.size - 1 by 2){
+ fragment.append(s" WHEN ${build(expressions(i))} THEN
${build(expressions(i+1))} ")
+ }
+
+ if (expressions.length % 2 != 0) {
+ val last = expressions(expressions.length - 1)
+ fragment.append(s" ELSE ${build(last)} ")
+ }
+ fragment.append(" END")
+
+ fragment.mkString
case _ => null
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f13830c..1913a51 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -55,6 +55,21 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
case "<=" => s"`${build(e.children()(0))}` <=
${build(e.children()(1))}"
case ">" => s"`${build(e.children()(0))}` >
${build(e.children()(1))}"
case ">=" => s"`${build(e.children()(0))}` >=
${build(e.children()(1))}"
+ case "CASE_WHEN" =>
+ val fragment = new StringBuilder("CASE ")
+ val expressions = e.children()
+
+ for(i<- 0 until expressions.size - 1 by 2){
+ fragment.append(s" WHEN ${build(expressions(i))} THEN
${build(expressions(i+1))} ")
+ }
+
+ if (expressions.length % 2 != 0) {
+ val last = expressions(expressions.length - 1)
+ fragment.append(s" ELSE ${build(last)} ")
+ }
+ fragment.append(" END")
+
+ fragment.mkString
case _ => null
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]