This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b92265a98f2 [SPARK-45685][CORE][SQL] Use `LazyList` instead of `Stream`
b92265a98f2 is described below
commit b92265a98f241b333467a02f4fffc9889ad3e7da
Author: yangjie01 <[email protected]>
AuthorDate: Sun Oct 29 22:43:54 2023 -0700
[SPARK-45685][CORE][SQL] Use `LazyList` instead of `Stream`
### What changes were proposed in this pull request?
This pr change to use `LazyList` instead of `Stream` due to `Stream` has
been marked as deprecated after Scala 2.13.0.
- `class Stream`
```scala
deprecated("Use LazyList (which is fully lazy) instead of Stream (which has
a lazy tail only)", "2.13.0")
SerialVersionUID(3L)
sealed abstract class Stream[+A] extends AbstractSeq[A]
with LinearSeq[A]
with LinearSeqOps[A, Stream, Stream[A]]
with IterableFactoryDefaults[A, Stream]
with Serializable {
...
deprecated("The `append` operation has been renamed `lazyAppendedAll`",
"2.13.0")
inline final def append[B >: A](rest: => IterableOnce[B]): Stream[B] =
lazyAppendedAll(rest)
```
- `object Stream`
```scala
deprecated("Use LazyList (which is fully lazy) instead of Stream (which has
a lazy tail only)", "2.13.0")
SerialVersionUID(3L)
object Stream extends SeqFactory[Stream] {
```
- `type Stream` and value Stream
```scala
deprecated("Use LazyList instead of Stream", "2.13.0")
type Stream[+A] = scala.collection.immutable.Stream[A]
deprecated("Use LazyList instead of Stream", "2.13.0")
val Stream = scala.collection.immutable.Stream
```
- method `toStream` in trait `IterableOnceOps`
```scala
deprecated("Use .to(LazyList) instead of .toStream", "2.13.0")
inline final def toStream: immutable.Stream[A] = to(immutable.Stream)
```
### Why are the changes needed?
Clean up deprecated Scala API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Acitons
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43563 from LuciferYang/stream-2-lazylist.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../input/WholeTextFileInputFormatSuite.scala | 2 +-
.../input/WholeTextFileRecordReaderSuite.scala | 2 +-
.../spark/storage/FlatmapIteratorSuite.scala | 6 ++--
.../expressions/collectionOperations.scala | 2 +-
.../plans/AliasAwareOutputExpression.scala | 4 +--
.../spark/sql/catalyst/plans/QueryPlan.scala | 2 +-
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 42 +++++++++++-----------
.../sql/catalyst/plans/LogicalPlanSuite.scala | 4 +--
.../spark/sql/catalyst/trees/TreeNodeSuite.scala | 36 +++++++++----------
.../spark/sql/RelationalGroupedDataset.scala | 4 +--
.../sql/execution/AliasAwareOutputExpression.scala | 2 +-
.../sql/execution/WholeStageCodegenExec.scala | 2 +-
.../execution/joins/BroadcastHashJoinExec.scala | 2 +-
.../apache/spark/sql/DataFrameAggregateSuite.scala | 2 +-
.../spark/sql/DataFrameWindowFunctionsSuite.scala | 2 +-
.../scala/org/apache/spark/sql/GenTPCDSData.scala | 13 ++++---
.../apache/spark/sql/GeneratorFunctionSuite.scala | 2 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 4 +--
.../apache/spark/sql/execution/PlannerSuite.scala | 2 +-
.../org/apache/spark/sql/execution/SortSuite.scala | 2 +-
.../sql/execution/WholeStageCodegenSuite.scala | 4 +--
21 files changed, 70 insertions(+), 71 deletions(-)
diff --git
a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
index 417e711e9c0..26c1be259ad 100644
---
a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
+++
b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
@@ -83,6 +83,6 @@ object WholeTextFileInputFormatSuite {
private val fileLengths = Array(10, 100, 1000)
private val files = fileLengths.zip(fileNames).map { case (upperBound,
filename) =>
- filename ->
Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+ filename ->
LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray
}.toMap
}
diff --git
a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index c833d22b3be..e64ebe2a551 100644
---
a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -145,6 +145,6 @@ object WholeTextFileRecordReaderSuite {
private val fileLengths = Array(10, 100, 1000)
private val files = fileLengths.zip(fileNames).map { case (upperBound,
filename) =>
- filename ->
Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+ filename ->
LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray
}.toMap
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
index e719c722d01..cb59766b08a 100644
--- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
@@ -36,7 +36,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with
LocalSparkContext {
sc = new SparkContext(sconf)
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
- flatMap( x => Stream.range(0, expand_size))
+ flatMap( x => LazyList.range(0, expand_size))
val persisted = data.persist(StorageLevel.DISK_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
@@ -47,7 +47,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with
LocalSparkContext {
sc = new SparkContext(sconf)
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
- flatMap(x => Stream.range(0, expand_size))
+ flatMap(x => LazyList.range(0, expand_size))
val persisted = data.persist(StorageLevel.MEMORY_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
@@ -59,7 +59,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with
LocalSparkContext {
sc = new SparkContext(sconf)
val expand_size = 500
val data = sc.parallelize(Seq(1, 2)).
- flatMap(x => Stream.range(1, expand_size).
+ flatMap(x => LazyList.range(1, expand_size).
map(y => "%d: string test %d".format(y, x)))
val persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
assert(persisted.filter(_.startsWith("1:")).count()===2)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 0a080423b10..e45f0d72a5c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2650,7 +2650,7 @@ case class Concat(children: Seq[Expression]) extends
ComplexTypeMergingExpressio
}
case ArrayType(elementType, _) =>
input => {
- val inputs = children.toStream.map(_.eval(input))
+ val inputs = children.to(LazyList).map(_.eval(input))
if (inputs.contains(null)) {
null
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
index 2cca7b844cc..e1a9e8b5ea8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -67,7 +67,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
/**
* Return a stream of expressions in which the original expression is
projected with `aliasMap`.
*/
- protected def projectExpression(expr: Expression): Stream[Expression] = {
+ protected def projectExpression(expr: Expression): LazyList[Expression] = {
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
expr.multiTransformDown {
// Mapping with aliases
@@ -103,7 +103,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
// but if only `b AS y` can be projected we can't return
`Seq(SortOrder(y))`.
orderingExpressions.iterator.map { sortOrder =>
val orderingSet = mutable.Set.empty[Expression]
- val sameOrderings = sortOrder.children.toStream
+ val sameOrderings = sortOrder.children.to(LazyList)
.flatMap(projectExpression)
.filter(e => orderingSet.add(e.canonicalized))
.take(aliasCandidateLimit)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 36bdc4c5b02..c58a1c4c0f3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -220,7 +220,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
case Some(value) => Some(recursiveTransform(value))
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
- case stream: Stream[_] => stream.map(recursiveTransform).force
+ case stream: LazyList[_] => stream.map(recursiveTransform).force
case seq: Iterable[_] => seq.map(recursiveTransform)
case other: AnyRef => other
case null => null
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 0e5dd301953..99dbda4027f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -356,8 +356,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
val newArgs = mapProductIterator {
case s: StructType => s // Don't convert struct types to some other type
of Seq[StructField]
// Handle Seq[TreeNode] in TreeNode parameters.
- case s: Stream[_] =>
- // Stream is lazy so we need to force materialization
+ case s: LazyList[_] =>
+ // LazyList is lazy so we need to force materialization
s.map(mapChild).force
case s: Seq[_] =>
s.map(mapChild)
@@ -557,7 +557,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
* @return the stream of alternatives
*/
def multiTransformDown(
- rule: PartialFunction[BaseType, Seq[BaseType]]): Stream[BaseType] = {
+ rule: PartialFunction[BaseType, Seq[BaseType]]): LazyList[BaseType] = {
multiTransformDownWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
}
@@ -567,11 +567,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
*
* As it is very easy to generate enormous number of alternatives when the
input tree is huge or
* when the rule returns many alternatives for many nodes, this function
returns the alternatives
- * as a lazy `Stream` to be able to limit the number of alternatives
generated at the caller side
- * as needed.
+ * as a lazy `LazyList` to be able to limit the number of alternatives
generated at the caller
+ * side as needed.
*
* The purpose of this function to access the returned alternatives by the
rule only if they are
- * needed so the rule can return a `Stream` whose elements are also lazily
calculated.
+ * needed so the rule can return a `LazyList` whose elements are also lazily
calculated.
* E.g. `multiTransform*` calls can be nested with the help of
* `MultiTransform.generateCartesianProduct()`.
*
@@ -579,7 +579,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
* the original node without any transformation is a valid alternative.
*
* The rule can return `Seq.empty` to indicate that the original node should
be pruned. In this
- * case `multiTransform()` returns an empty `Stream`.
+ * case `multiTransform()` returns an empty `LazyList`.
*
* Please consider the following examples of
`input.multiTransformDown(rule)`:
*
@@ -593,7 +593,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
* `Add(a, b)` => `Seq(11, 12, 21, 22)`
*
* The output is:
- * `Stream(11, 12, 21, 22)`
+ * `LazyList(11, 12, 21, 22)`
*
* 2.
* In the previous example if we want to generate alternatives of `a` and
`b` too then we need to
@@ -603,7 +603,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
* `Add(a, b)` => `Seq(11, 12, 21, 22, Add(a, b))`
*
* The output is:
- * `Stream(11, 12, 21, 22, Add(1, 10), Add(2, 10), Add(1, 20), Add(2, 20))`
+ * `LazyList(11, 12, 21, 22, Add(1, 10), Add(2, 10), Add(1, 20), Add(2,
20))`
*
* @param rule a function used to generate alternatives for a node
* @param cond a Lambda expression to prune tree traversals. If
`cond.apply` returns false
@@ -619,15 +619,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
def multiTransformDownWithPruning(
cond: TreePatternBits => Boolean,
ruleId: RuleId = UnknownRuleId
- )(rule: PartialFunction[BaseType, Seq[BaseType]]): Stream[BaseType] = {
+ )(rule: PartialFunction[BaseType, Seq[BaseType]]): LazyList[BaseType] = {
if (!cond.apply(this) || isRuleIneffective(ruleId)) {
- return Stream(this)
+ return LazyList(this)
}
// We could return `Seq(this)` if the `rule` doesn't apply and handle both
// - the doesn't apply
// - and the rule returns a one element `Seq(originalNode)`
- // cases together. The returned `Seq` can be a `Stream` and unfortunately
it doesn't seem like
+ // cases together. The returned `Seq` can be a `LazyList` and
unfortunately it doesn't seem like
// there is a way to match on a one element stream without eagerly
computing the tail's head.
// This contradicts with the purpose of only taking the necessary elements
from the
// alternatives. I.e. the "multiTransformDown is lazy" test case in
`TreeNodeSuite` would fail.
@@ -641,18 +641,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
})
}
- val afterRulesStream = if (afterRules.isEmpty) {
+ val afterRulesLazyList = if (afterRules.isEmpty) {
if (ruleApplied) {
// If the rule returned with empty alternatives then prune
- Stream.empty
+ LazyList.empty
} else {
// If the rule was not applied then keep the original node
this.markRuleAsIneffective(ruleId)
- Stream(this)
+ LazyList(this)
}
} else {
// If the rule was applied then use the returned alternatives
- afterRules.toStream.map { afterRule =>
+ afterRules.to(LazyList).map { afterRule =>
if (this fastEquals afterRule) {
this
} else {
@@ -662,13 +662,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
}
}
- afterRulesStream.flatMap { afterRule =>
+ afterRulesLazyList.flatMap { afterRule =>
if (afterRule.containsChild.nonEmpty) {
MultiTransform.generateCartesianProduct(
afterRule.children.map(c => () =>
c.multiTransformDownWithPruning(cond, ruleId)(rule)))
.map(afterRule.withNewChildren)
} else {
- Stream(afterRule)
+ LazyList(afterRule)
}
}
}
@@ -792,7 +792,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
case other => other
}.view.force.toMap // `mapValues` is lazy and we need to force it to
materialize
case d: DataType => d // Avoid unpacking Structs
- case args: Stream[_] => args.map(mapChild).force // Force
materialization on stream
+ case args: LazyList[_] => args.map(mapChild).force // Force
materialization on stream
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
@@ -1321,8 +1321,8 @@ object MultiTransform {
* @param elementSeqs a list of sequences to build the cartesian product from
* @return the stream of generated `Seq` elements
*/
- def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]):
Stream[Seq[T]] = {
- elementSeqs.foldRight(Stream(Seq.empty[T]))((elements, elementTails) =>
+ def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]):
LazyList[Seq[T]] = {
+ elementSeqs.foldRight(LazyList(Seq.empty[T]))((elements, elementTails) =>
for {
elementTail <- elementTails
element <- elements()
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index 8266d30d055..ea0fcac881c 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -84,7 +84,7 @@ class LogicalPlanSuite extends SparkFunSuite {
test("transformExpressions works with a Stream") {
val id1 = NamedExpression.newExprId
val id2 = NamedExpression.newExprId
- val plan = Project(Stream(
+ val plan = Project(LazyList(
Alias(Literal(1), "a")(exprId = id1),
Alias(Literal(2), "b")(exprId = id2)),
OneRowRelation())
@@ -92,7 +92,7 @@ class LogicalPlanSuite extends SparkFunSuite {
case Literal(v: Int, IntegerType) if v != 1 =>
Literal(v + 1, IntegerType)
}
- val expected = Project(Stream(
+ val expected = Project(LazyList(
Alias(Literal(1), "a")(exprId = id1),
Alias(Literal(3), "b")(exprId = id2)),
OneRowRelation())
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 33ba4f05972..4dbadef93a0 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -693,7 +693,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
}
test("transform works on stream of children") {
- val before = Coalesce(Stream(Literal(1), Literal(2)))
+ val before = Coalesce(LazyList(Literal(1), Literal(2)))
// Note it is a bit tricky to exhibit the broken behavior. Basically we
want to create the
// situation in which the TreeNode.mapChildren function's change detection
is not triggered. A
// stream's first element is typically materialized, so in order to not
trip the TreeNode change
@@ -702,14 +702,14 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
case Literal(v: Int, IntegerType) if v != 1 =>
Literal(v + 1, IntegerType)
}
- val expected = Coalesce(Stream(Literal(1), Literal(3)))
+ val expected = Coalesce(LazyList(Literal(1), Literal(3)))
assert(result === expected)
}
test("withNewChildren on stream of children") {
- val before = Coalesce(Stream(Literal(1), Literal(2)))
- val result = before.withNewChildren(Stream(Literal(1), Literal(3)))
- val expected = Coalesce(Stream(Literal(1), Literal(3)))
+ val before = Coalesce(LazyList(Literal(1), Literal(2)))
+ val result = before.withNewChildren(LazyList(Literal(1), Literal(3)))
+ val expected = Coalesce(LazyList(Literal(1), Literal(3)))
assert(result === expected)
}
@@ -949,9 +949,9 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
}
}
- private def newErrorAfterStream(es: Expression*) = {
- es.toStream.append(
- throw new NoSuchElementException("Stream should not return more
elements")
+ private def newErrorAfterLazyList(es: Expression*) = {
+ es.to(LazyList).lazyAppendedAll(
+ throw new NoSuchElementException("LazyList should not return more
elements")
)
}
@@ -975,8 +975,8 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
val e = Add(Add(Literal("a"), Literal("b")), Add(Literal("c"),
Literal("d")))
val transformed = e.multiTransformDown {
case StringLiteral("a") => Seq(Literal(1), Literal(2), Literal(3))
- case StringLiteral("b") => newErrorAfterStream(Literal(10))
- case Add(StringLiteral("c"), StringLiteral("d"), _) =>
newErrorAfterStream(Literal(100))
+ case StringLiteral("b") => newErrorAfterLazyList(Literal(10))
+ case Add(StringLiteral("c"), StringLiteral("d"), _) =>
newErrorAfterLazyList(Literal(100))
}
val expected = for {
a <- Seq(Literal(1), Literal(2), Literal(3))
@@ -990,7 +990,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
val transformed2 = e.multiTransformDown {
case StringLiteral("a") => Seq(Literal(1), Literal(2), Literal(3))
case StringLiteral("b") => Seq(Literal(10), Literal(20), Literal(30))
- case Add(StringLiteral("c"), StringLiteral("d"), _) =>
newErrorAfterStream(Literal(100))
+ case Add(StringLiteral("c"), StringLiteral("d"), _) =>
newErrorAfterLazyList(Literal(100))
}
val expected2 = for {
b <- Seq(Literal(10), Literal(20), Literal(30))
@@ -1055,7 +1055,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
test("multiTransformDown alternatives are generated only if needed") {
val e = Add(Add(Literal("a"), Literal("b")), Add(Literal("c"),
Literal("d")))
val transformed = e.multiTransformDown {
- case StringLiteral("a") => newErrorAfterStream()
+ case StringLiteral("a") => newErrorAfterLazyList()
case StringLiteral("b") => Seq.empty
}
assert(transformed.isEmpty)
@@ -1075,10 +1075,10 @@ class TreeNodeSuite extends SparkFunSuite with
SQLHelper {
a_or_b.getOrElse(
// Besides returning the alternatives for the first encounter, also
set up a mechanism to
// update the cache when the new alternatives are requested.
- Stream(Literal(1), Literal(2)).map { x =>
+ LazyList(Literal(1), Literal(2)).map { x =>
a_or_b = Some(Seq(x))
x
- }.append {
+ }.lazyAppendedAll {
a_or_b = None
Seq.empty
})
@@ -1094,19 +1094,19 @@ class TreeNodeSuite extends SparkFunSuite with
SQLHelper {
val transformed2 = e.multiTransformDown {
case StringLiteral("a") | StringLiteral("b") =>
a_or_b.getOrElse(
- Stream(Literal(1), Literal(2)).map { x =>
+ LazyList(Literal(1), Literal(2)).map { x =>
a_or_b = Some(Seq(x))
x
- }.append {
+ }.lazyAppendedAll {
a_or_b = None
Seq.empty
})
case StringLiteral("c") | StringLiteral("d") =>
c_or_d.getOrElse(
- Stream(Literal(10), Literal(20)).map { x =>
+ LazyList(Literal(10), Literal(20)).map { x =>
c_or_d = Some(Seq(x))
x
- }.append {
+ }.lazyAppendedAll {
c_or_d = None
Seq.empty
})
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 27093ced751..7e15c0baf52 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -59,8 +59,8 @@ class RelationalGroupedDataset protected[sql](
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
val aggregates = if
(df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
groupingExprs match {
- // call `toList` because `Stream` can't serialize in scala 2.13
- case s: Stream[Expression] => s.toList ++ aggExprs
+ // call `toList` because `LazyList` can't serialize in scala 2.13
+ case s: LazyList[Expression] => s.toList ++ aggExprs
case other => other ++ aggExprs
}
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
index e1dcab80af3..67d2849e005 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
@@ -42,7 +42,7 @@ trait PartitioningPreservingUnaryExecNode extends
UnaryExecNode
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
- .asInstanceOf[Stream[Partitioning]]
+ .asInstanceOf[LazyList[Partitioning]]
case o => Seq(o)
}
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 40de623f73d..0aefb0649d0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -164,7 +164,7 @@ trait CodegenSupport extends SparkPlan {
}
val inputVars = inputVarsCandidate match {
- case stream: Stream[ExprCode] => stream.force
+ case stream: LazyList[ExprCode] => stream.force
case other => other
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 9f9f8743146..68022757ff2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -115,7 +115,7 @@ case class BroadcastHashJoinExec(
PartitioningCollection(partitioning.multiTransformDown {
case e: Expression if
streamedKeyToBuildKeyMapping.contains(e.canonicalized) =>
e +: streamedKeyToBuildKeyMapping(e.canonicalized)
- }.asInstanceOf[Stream[HashPartitioning]]
+ }.asInstanceOf[LazyList[HashPartitioning]]
.take(conf.broadcastHashJoinOutputPartitioningExpandLimit))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index ccfdddd039d..f34d7cf3680 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1617,7 +1617,7 @@ class DataFrameAggregateSuite extends QueryTest
}
test("SPARK-38221: group by stream of complex expressions should not fail") {
- val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2):
_*).sum("id")
+ val df = Seq(1).toDF("id").groupBy(LazyList($"id" + 1, $"id" + 2):
_*).sum("id")
checkAnswer(df, Row(2, 3, 1))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9454acba87b..84133eb485f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1232,7 +1232,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
).toDF("a", "b", "c")
val w = Window.partitionBy("a").orderBy("b")
- val selectExprs = Stream(
+ val selectExprs = LazyList(
sum("c").over(w.rowsBetween(Window.unboundedPreceding,
Window.currentRow)).as("sumc"),
avg("c").over(w.rowsBetween(Window.unboundedPreceding,
Window.currentRow)).as("avgc")
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
index 8556ccf74b7..6768c5fd07b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
import java.util.concurrent.LinkedBlockingQueue
-import scala.collection.immutable.Stream
import scala.sys.process._
import scala.util.Try
@@ -46,7 +45,7 @@ object BlockingLineStream {
private final class BlockingStreamed[T](
val process: T => Unit,
val done: Int => Unit,
- val stream: () => Stream[T])
+ val stream: () => LazyList[T])
// See scala.sys.process.Streamed
private object BlockingStreamed {
@@ -57,11 +56,11 @@ object BlockingLineStream {
def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
- def next(): Stream[T] = q.take match {
- case Left(0) => Stream.empty
+ def next(): LazyList[T] = q.take match {
+ case Left(0) => LazyList.empty
case Left(code) =>
- if (nonzeroException) scala.sys.error("Nonzero exit code: " + code)
else Stream.empty
- case Right(s) => Stream.cons(s, next())
+ if (nonzeroException) scala.sys.error("Nonzero exit code: " + code)
else LazyList.empty
+ case Right(s) => LazyList.cons(s, next())
}
new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code),
() => next())
@@ -79,7 +78,7 @@ object BlockingLineStream {
}
}
- def apply(command: Seq[String]): Stream[String] = {
+ def apply(command: Seq[String]): LazyList[String] = {
val streamed = BlockingStreamed[String](true)
val process = command.run(BasicIO(false, streamed.process, None))
Spawn(streamed.done(process.exitValue()))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index afc152e072a..9b4ad768818 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -457,7 +457,7 @@ class GeneratorFunctionSuite extends QueryTest with
SharedSparkSession {
test("SPARK-38528: generator in stream of aggregate expressions") {
val df = Seq(1, 2, 3).toDF("v")
checkAnswer(
- df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
+ df.select(LazyList(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
Row(1, 6) :: Row(3, 6) :: Nil)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8c293bab641..c41b85f75e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1714,7 +1714,7 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
val dsA = Seq((1, "a")).toDF("id", "c1")
val dsB = Seq((2, "b")).toDF("id", "c2")
val dsC = Seq((3, "c")).toDF("id", "c3")
- val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC,
Stream("id"), "full_outer")
+ val joined = dsA.join(dsB, LazyList("id"), "full_outer").join(dsC,
LazyList("id"), "full_outer")
val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null),
Row(3, null, null, "c"))
@@ -1723,7 +1723,7 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with
invalid access") {
val ds = Seq((1, "a")).toDF("id", "c1")
- val joined = ds.join(ds, Stream("id"), "full_outer").join(ds,
Stream("id"), "full_outer")
+ val joined = ds.join(ds, LazyList("id"), "full_outer").join(ds,
LazyList("id"), "full_outer")
val expected = Seq(Row(1, "a", "a", "a"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index de24b8c82b0..c5b1e68fb91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -743,7 +743,7 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
}
test("SPARK-24500: create union with stream of children") {
- val df = Union(Stream(
+ val df = Union(LazyList(
Range(1, 1, 1, 1),
Range(1, 2, 1, 1)))
df.queryExecution.executedPlan.execute()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 03e56cb9532..4cb9ae7cbc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -108,7 +108,7 @@ class SortSuite extends SparkPlanTest with
SharedSparkSession {
)
checkAnswer(
input.toDF("a", "b", "c"),
- (child: SparkPlan) => SortExec(Stream($"a".asc, $"b".asc, $"c".asc),
+ (child: SparkPlan) => SortExec(LazyList($"a".asc, $"b".asc, $"c".asc),
global = true, child = child),
input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple),
sortAnswers = false)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 08db88b0224..146a583d773 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -776,14 +776,14 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
val b = Seq((1, "a")).toDF("key", "value")
val c = Seq(1).toDF("key")
- val ab = a.join(b, Stream("key"), "left")
+ val ab = a.join(b, LazyList("key"), "left")
val abc = ab.join(c, Seq("key"), "left")
checkAnswer(abc, Row(1, "a"))
}
test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
- val groupByCols = Stream(col("key"))
+ val groupByCols = LazyList(col("key"))
val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
.groupBy(groupByCols: _*)
.max("value")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]