This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e4cd1463a [spark] Bump Spark version 3.4.2 (#2838)
e4cd1463a is described below
commit e4cd1463ad39836a0a56f7fe70de9cfd4475c47f
Author: cxzl25 <[email protected]>
AuthorDate: Thu Feb 22 21:14:47 2024 +0800
[spark] Bump Spark version 3.4.2 (#2838)
---
.../paimon/spark/sql/PaimonOptimizationTest.scala | 7 ++-
paimon-spark/paimon-spark-3.4/pom.xml | 2 +-
.../paimon/spark/sql/PaimonOptimizationTest.scala | 7 ++-
.../paimon/spark/sql/PaimonOptimizationTest.scala | 7 ++-
.../MergePaimonScalarSubqueriersBase.scala | 8 +++-
.../paimon/spark/util/CTERelationRefUtils.scala | 52 ++++++++++++++++++++++
6 files changed, 74 insertions(+), 9 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index 6e4787627..0a4dfb769 100644
---
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -18,9 +18,10 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.spark.util.CTERelationRefUtils
+
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField,
NamedExpression, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
class PaimonOptimizationTest extends PaimonOptimizationTestBase {
@@ -28,7 +29,9 @@ class PaimonOptimizationTest extends
PaimonOptimizationTestBase {
cteIndex: Int,
output: Seq[Attribute],
fieldIndex: Int): NamedExpression = {
- GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true,
output)), fieldIndex)
+ GetStructField(
+ ScalarSubquery(CTERelationRefUtils.createCTERelationRef(cteIndex,
_resolved = true, output)),
+ fieldIndex)
.as("scalarsubquery()")
}
}
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml
b/paimon-spark/paimon-spark-3.4/pom.xml
index bfbfb2db1..8fadb8ab0 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -32,7 +32,7 @@ under the License.
<name>Paimon : Spark : 3.4</name>
<properties>
- <spark.version>3.4.1</spark.version>
+ <spark.version>3.4.2</spark.version>
</properties>
<dependencies>
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index 270ec57d6..f5ee0706c 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -18,8 +18,9 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.spark.util.CTERelationRefUtils
+
import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField,
NamedExpression, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
class PaimonOptimizationTest extends PaimonOptimizationTestBase {
@@ -29,7 +30,9 @@ class PaimonOptimizationTest extends
PaimonOptimizationTestBase {
cteIndex: Int,
output: Seq[Attribute],
fieldIndex: Int): NamedExpression = {
- GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true,
output)), fieldIndex)
+ GetStructField(
+ ScalarSubquery(CTERelationRefUtils.createCTERelationRef(cteIndex,
_resolved = true, output)),
+ fieldIndex)
.as("scalarsubquery()")
}
}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
index 6e4787627..0a4dfb769 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTest.scala
@@ -18,9 +18,10 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.spark.util.CTERelationRefUtils
+
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Attribute, GetStructField,
NamedExpression, ScalarSubquery}
-import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
class PaimonOptimizationTest extends PaimonOptimizationTestBase {
@@ -28,7 +29,9 @@ class PaimonOptimizationTest extends
PaimonOptimizationTestBase {
cteIndex: Int,
output: Seq[Attribute],
fieldIndex: Int): NamedExpression = {
- GetStructField(ScalarSubquery(CTERelationRef(cteIndex, _resolved = true,
output)), fieldIndex)
+ GetStructField(
+ ScalarSubquery(CTERelationRefUtils.createCTERelationRef(cteIndex,
_resolved = true, output)),
+ fieldIndex)
.as("scalarsubquery()")
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
index 2bc422525..49cd5e907 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
@@ -19,10 +19,11 @@
package org.apache.paimon.spark.catalyst.optimizer
import org.apache.paimon.spark.PaimonScan
+import org.apache.paimon.spark.util.CTERelationRefUtils
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, CreateNamedStruct, Expression, ExprId, GetStructField,
LeafExpression, Literal, NamedExpression, PredicateHelper, ScalarSubquery,
Unevaluable}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef,
CTERelationRef, Filter, Join, LogicalPlan, Project, Subquery, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationDef,
Filter, Join, LogicalPlan, Project, Subquery, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{SCALAR_SUBQUERY,
SCALAR_SUBQUERY_REFERENCE, TreePattern}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
@@ -361,7 +362,10 @@ trait MergePaimonScalarSubqueriersBase extends
Rule[LogicalPlan] with PredicateH
val subqueryCTE = header.plan.asInstanceOf[CTERelationDef]
GetStructField(
createScalarSubquery(
- CTERelationRef(subqueryCTE.id, _resolved = true,
subqueryCTE.output),
+ CTERelationRefUtils.createCTERelationRef(
+ subqueryCTE.id,
+ _resolved = true,
+ subqueryCTE.output),
ssr.exprId),
ssr.headerIndex)
} else {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/CTERelationRefUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/CTERelationRefUtils.scala
new file mode 100644
index 000000000..8b8abe7c6
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/CTERelationRefUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.CTERelationRef
+
+object CTERelationRefUtils {
+
+ private val (ctorm, hasStreamingField) = init()
+
+ private def init() = {
+ val ru = scala.reflect.runtime.universe
+ val m = ru.runtimeMirror(getClass.getClassLoader)
+ val classC = ru.typeOf[CTERelationRef].typeSymbol.asClass
+ val cm = m.reflectClass(classC)
+ val ctorC =
ru.typeOf[CTERelationRef].decl(ru.termNames.CONSTRUCTOR).asMethod
+ val ctorm = cm.reflectConstructor(ctorC)
+ // SPARK-46062 add isStreaming param
+ val hasStreamingField =
+ ctorC.paramLists.head.exists(_.name.encodedName.toString ==
"isStreaming")
+ (ctorm, hasStreamingField)
+ }
+
+ def createCTERelationRef(
+ cteId: Long,
+ _resolved: Boolean,
+ output: Seq[Attribute]): CTERelationRef = {
+ val value = if (hasStreamingField) {
+ ctorm(cteId, _resolved, output, false, None)
+ } else {
+ ctorm(cteId, _resolved, output, None)
+ }
+ value.asInstanceOf[CTERelationRef]
+ }
+}