This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 1760bf6f IGNITE-24471 Support Spark 3.5 (#310)
1760bf6f is described below
commit 1760bf6f1a96e4e9f41e87a2b5c64d4402e58d58
Author: Maksim Myskov <[email protected]>
AuthorDate: Tue Jun 17 16:57:05 2025 +0300
IGNITE-24471 Support Spark 3.5 (#310)
---
modules/spark-ext/examples/pom.xml | 5 -----
modules/spark-ext/pom.xml | 4 ++--
modules/spark-ext/spark/pom.xml | 6 ++++++
.../ignite/spark/impl/optimization/DateExpressions.scala | 6 +++---
.../ignite/spark/impl/optimization/MathExpressions.scala | 4 ++--
.../ignite/spark/impl/optimization/SystemExpressions.scala | 8 +-------
.../main/scala/org/apache/ignite/spark/impl/package.scala | 2 +-
.../org/apache/spark/sql/ignite/IgniteSparkSession.scala | 13 +++++++------
.../scala/org/apache/ignite/spark/IgniteCatalogSpec.scala | 2 +-
9 files changed, 23 insertions(+), 27 deletions(-)
diff --git a/modules/spark-ext/examples/pom.xml
b/modules/spark-ext/examples/pom.xml
index 677b2edc..442da481 100644
--- a/modules/spark-ext/examples/pom.xml
+++ b/modules/spark-ext/examples/pom.xml
@@ -92,11 +92,6 @@
<artifactId>log4j-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </dependency>
-
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
diff --git a/modules/spark-ext/pom.xml b/modules/spark-ext/pom.xml
index 9fc038f5..69284d3c 100644
--- a/modules/spark-ext/pom.xml
+++ b/modules/spark-ext/pom.xml
@@ -36,8 +36,8 @@
<properties>
<scala.library.version>2.12.16</scala.library.version>
<scala.test.version>3.2.12</scala.test.version>
- <spark.version>3.2.2</spark.version>
- <spark.jackson.version>2.12.7</spark.jackson.version>
+ <spark.version>3.5.6</spark.version>
+ <spark.jackson.version>2.15.4</spark.jackson.version>
</properties>
<modules>
diff --git a/modules/spark-ext/spark/pom.xml b/modules/spark-ext/spark/pom.xml
index cca080f9..7a72f25c 100644
--- a/modules/spark-ext/spark/pom.xml
+++ b/modules/spark-ext/spark/pom.xml
@@ -112,6 +112,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${spark.jackson.version}</version>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
diff --git
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
index 156d4fa2..d0030a8c 100644
---
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
+++
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
@@ -52,8 +52,8 @@ private[optimization] object DateExpressions extends
SupportedExpressions {
case Month(date) ⇒
checkChild(date)
- case ParseToDate(left, format, child) ⇒
- checkChild(left) && (format.isEmpty || checkChild(format.get)) &&
checkChild(child)
+ case ParseToDate(left, format, _, _) ⇒
+ checkChild(left) && (format.isEmpty || checkChild(format.get))
case Quarter(date) ⇒
checkChild(date)
@@ -101,7 +101,7 @@ private[optimization] object DateExpressions extends
SupportedExpressions {
case Month(date) ⇒
Some(s"MINUTE(${childToString(date)})")
- case ParseToDate(left, formatOption, _) ⇒
+ case ParseToDate(left, formatOption, _, _) ⇒
formatOption match {
case Some(format) ⇒
Some(s"PARSEDATETIME(${childToString(left)},
${childToString(format)})")
diff --git
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
index 99386ac4..757e680d 100644
---
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
+++
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
@@ -106,7 +106,7 @@ private[optimization] object MathExpressions extends
SupportedExpressions {
case Rand(child, _) ⇒
checkChild(child)
- case Round(child, scale) ⇒
+ case Round(child, scale, _) ⇒
checkChild(child) && checkChild(scale)
case Signum(child) ⇒
@@ -230,7 +230,7 @@ private[optimization] object MathExpressions extends
SupportedExpressions {
case Rand(child, _) ⇒
Some(s"RAND(${childToString(child)})")
- case Round(child, scale) ⇒
+ case Round(child, scale, _) ⇒
Some(s"ROUND(${childToString(child)}, ${childToString(scale)})")
case Signum(child) ⇒
diff --git
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
index 66cfc714..6fb781e8 100644
---
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
+++
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
@@ -18,7 +18,7 @@
package org.apache.ignite.spark.impl.optimization
import org.apache.ignite.IgniteException
-import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo,
Expression, Greatest, If, IfNull, IsNotNull, IsNull, Least, Literal, NullIf,
Nvl2}
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo,
Expression, Greatest, If, IsNotNull, IsNull, Least, Literal, NullIf, Nvl2}
/**
* Object to support some built-in expressions like `nvl2` or `coalesce`.
@@ -32,9 +32,6 @@ private[optimization] object SystemExpressions extends
SupportedExpressions {
case Greatest(children) ⇒
children.forall(checkChild)
- case IfNull(left, right, _) ⇒
- checkChild(left) && checkChild(right)
-
case Least(children) ⇒
children.forall(checkChild)
@@ -78,9 +75,6 @@ private[optimization] object SystemExpressions extends
SupportedExpressions {
case Greatest(children) ⇒
Some(s"GREATEST(${children.map(childToString(_)).mkString(", ")})")
- case IfNull(left, right, _) ⇒
- Some(s"IFNULL(${childToString(left)}, ${childToString(right)})")
-
case Least(children) ⇒
Some(s"LEAST(${children.map(childToString(_)).mkString(", ")})")
diff --git
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index c41937a3..5ae4d87c 100644
---
a/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++
b/modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -17,7 +17,7 @@
package org.apache.ignite.spark
-import org.apache.commons.lang.StringUtils.equalsIgnoreCase
+import org.apache.commons.lang3.StringUtils.equalsIgnoreCase
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.configuration.CacheConfiguration
diff --git
a/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
b/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
index cddf56c3..481de644 100644
---
a/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
+++
b/modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal._
@@ -101,12 +102,12 @@ class IgniteSparkSession private(
/** @inheritdoc */
override def emptyDataset[T: Encoder]: Dataset[T] = {
val encoder = implicitly[Encoder[T]]
- new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder)
+ new Dataset(self,
LocalRelation(DataTypeUtils.toAttributes(encoder.schema)), encoder)
}
/** @inheritdoc */
override def createDataFrame(rows: java.util.List[Row], schema:
StructType): DataFrame = {
- Dataset.ofRows(self,
LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
+ Dataset.ofRows(self,
LocalRelation.fromExternalRows(DataTypeUtils.toAttributes(schema),
rows.asScala))
}
/** @inheritdoc */
@@ -141,7 +142,7 @@ class IgniteSparkSession private(
/** @inheritdoc */
override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
val enc = encoderFor[T]
- val attributes = enc.schema.toAttributes
+ val attributes = DataTypeUtils.toAttributes(enc.schema)
val encoded = data.map(d => enc.createSerializer().apply(d))
val plan = new LocalRelation(attributes, encoded)
Dataset[T](self, plan)
@@ -179,7 +180,7 @@ class IgniteSparkSession private(
/** @inheritdoc */
override private[sql] def applySchemaToPythonRDD(rdd: RDD[Array[Any]],
schema: StructType) = {
val rowRdd = rdd.map(r =>
python.EvaluatePython.makeFromJava(schema).asInstanceOf[InternalRow])
- Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self))
+ Dataset.ofRows(self, LogicalRDD(DataTypeUtils.toAttributes(schema),
rowRdd)(self))
}
/** @inheritdoc */
@@ -199,10 +200,10 @@ class IgniteSparkSession private(
override def createDataFrame(rowRDD: RDD[Row],
schema: StructType): DataFrame = {
val catalystRows = {
- val encoder = RowEncoder(schema).createSerializer()
+ val encoder = ExpressionEncoder.apply(schema).createSerializer()
rowRDD.map(encoder.apply)
}
- val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
+ val logicalPlan = LogicalRDD(DataTypeUtils.toAttributes(schema),
catalystRows)(self)
Dataset.ofRows(self, logicalPlan)
}
diff --git
a/modules/spark-ext/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
b/modules/spark-ext/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
index 8cd134fb..b7bc500b 100644
---
a/modules/spark-ext/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
+++
b/modules/spark-ext/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
@@ -88,7 +88,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
it("Should provide the list of all schemas") {
val schemas = igniteSession.catalog.listDatabases().collect()
- schemas.map(_.name).sorted should equal(Array("cache3",
"employeeschema", "public"))
+ schemas.map(_.name).sorted should equal(Array("EMPLOYEESCHEMA",
"PUBLIC", "cache3"))
}
it("Should provide ability to query SQL table without explicit
registration") {