This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e4cd1463a [spark] Bump Spark version 3.4.2 (#2838)
e4cd1463a is described below

commit e4cd1463ad39836a0a56f7fe70de9cfd4475c47f
Author: cxzl25 <[email protected]>
AuthorDate: Thu Feb 22 21:14:47 2024 +0800

    [spark] Bump Spark version 3.4.2 (#2838)
---
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  7 ++-
 paimon-spark/paimon-spark-3.4/pom.xml              |  2 +-
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  7 ++-
 .../paimon/spark/sql/PaimonOptimizationTest.scala  |  7 ++-
 .../MergePaimonScalarSubqueriersBase.scala         |  8 +++-
 .../paimon/spark/util/CTERelationRefUtils.scala    | 52 ++++++++++++++++++++++
 6 files changed, 74 insertions(+), 9 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index 6e4787627..0a4dfb769 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.spark.util.CTERelationRefUtils
+
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, 
NamedExpression, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
 
 class PaimonOptimizationTest extends PaimonOptimizationTestBase {
 
@@ -28,7 +29,9 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       cteIndex: Int,
       output: Seq[Attribute],
       fieldIndex: Int): NamedExpression = {
-    GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, 
output)), fieldIndex)
+    GetStructField(
+      ScalarSubquery(CTERelationRefUtils.createCTERelationRef(cteIndex, 
_resolved = true, output)),
+      fieldIndex)
       .as("scalarsubquery()")
   }
 }
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml 
b/paimon-spark/paimon-spark-3.4/pom.xml
index bfbfb2db1..8fadb8ab0 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -32,7 +32,7 @@ under the License.
     <name>Paimon : Spark : 3.4</name>
 
     <properties>
-        <spark.version>3.4.1</spark.version>
+        <spark.version>3.4.2</spark.version>
     </properties>
 
     <dependencies>
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index 270ec57d6..f5ee0706c 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -18,8 +18,9 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.spark.util.CTERelationRefUtils
+
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, 
NamedExpression, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
 
 class PaimonOptimizationTest extends PaimonOptimizationTestBase {
 
@@ -29,7 +30,9 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       cteIndex: Int,
       output: Seq[Attribute],
       fieldIndex: Int): NamedExpression = {
-    GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, 
output)), fieldIndex)
+    GetStructField(
+      ScalarSubquery(CTERelationRefUtils.createCTERelationRef(cteIndex, 
_resolved = true, output)),
+      fieldIndex)
       .as("scalarsubquery()")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index 6e4787627..0a4dfb769 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.spark.util.CTERelationRefUtils
+
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField, 
NamedExpression, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
 
 class PaimonOptimizationTest extends PaimonOptimizationTestBase {
 
@@ -28,7 +29,9 @@ class PaimonOptimizationTest extends 
PaimonOptimizationTestBase {
       cteIndex: Int,
       output: Seq[Attribute],
       fieldIndex: Int): NamedExpression = {
-    GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true, 
output)), fieldIndex)
+    GetStructField(
+      ScalarSubquery(CTERelationRefUtils.createCTERelationRef(cteIndex, 
_resolved = true, output)),
+      fieldIndex)
       .as("scalarsubquery()")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
index 2bc422525..49cd5e907 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
@@ -19,10 +19,11 @@
 package org.apache.paimon.spark.catalyst.optimizer
 
 import org.apache.paimon.spark.PaimonScan
+import org.apache.paimon.spark.util.CTERelationRefUtils
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, CreateNamedStruct, Expression, ExprId, GetStructField, 
LeafExpression, Literal, NamedExpression, PredicateHelper, ScalarSubquery, 
Unevaluable}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef, 
CTERelationRef, Filter, Join, LogicalPlan, Project, Subquery, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef, 
Filter, Join, LogicalPlan, Project, Subquery, WithCTE}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{SCALAR_SUBQUERY, 
SCALAR_SUBQUERY_REFERENCE, TreePattern}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
@@ -361,7 +362,10 @@ trait MergePaimonScalarSubqueriersBase extends 
Rule[LogicalPlan] with PredicateH
               val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
               GetStructField(
                 createScalarSubquery(
-                  CTERelationRef(subqueryCTE.id, _resolved = true, 
subqueryCTE.output),
+                  CTERelationRefUtils.createCTERelationRef(
+                    subqueryCTE.id,
+                    _resolved = true,
+                    subqueryCTE.output),
                   ssr.exprId),
                 ssr.headerIndex)
             } else {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/CTERelationRefUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/CTERelationRefUtils.scala
new file mode 100644
index 000000000..8b8abe7c6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/CTERelationRefUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
+
+object CTERelationRefUtils {
+
+  private val (ctorm, hasStreamingField) = init()
+
+  private def init() = {
+    val ru = scala.reflect.runtime.universe
+    val m = ru.runtimeMirror(getClass.getClassLoader)
+    val classC = ru.typeOf[CTERelationRef].typeSymbol.asClass
+    val cm = m.reflectClass(classC)
+    val ctorC = 
ru.typeOf[CTERelationRef].decl(ru.termNames.CONSTRUCTOR).asMethod
+    val ctorm = cm.reflectConstructor(ctorC)
+    // SPARK-46062 add isStreaming param
+    val hasStreamingField =
+      ctorC.paramLists.head.exists(_.name.encodedName.toString == 
"isStreaming")
+    (ctorm, hasStreamingField)
+  }
+
+  def createCTERelationRef(
+      cteId: Long,
+      _resolved: Boolean,
+      output: Seq[Attribute]): CTERelationRef = {
+    val value = if (hasStreamingField) {
+      ctorm(cteId, _resolved, output, false, None)
+    } else {
+      ctorm(cteId, _resolved, output, None)
+    }
+    value.asInstanceOf[CTERelationRef]
+  }
+}

Reply via email to