This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b92265a98f2 [SPARK-45685][CORE][SQL] Use `LazyList` instead of `Stream`
b92265a98f2 is described below

commit b92265a98f241b333467a02f4fffc9889ad3e7da
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun Oct 29 22:43:54 2023 -0700

    [SPARK-45685][CORE][SQL] Use `LazyList` instead of `Stream`
    
    ### What changes were proposed in this pull request?
    This pr change to use  `LazyList` instead of `Stream` due to `Stream` has 
been marked as deprecated after Scala 2.13.0.
    
    - `class Stream`
    
    ```scala
    deprecated("Use LazyList (which is fully lazy) instead of Stream (which has 
a lazy tail only)", "2.13.0")
    SerialVersionUID(3L)
    sealed abstract class Stream[+A] extends AbstractSeq[A]
      with LinearSeq[A]
      with LinearSeqOps[A, Stream, Stream[A]]
      with IterableFactoryDefaults[A, Stream]
      with Serializable {
      ...
      deprecated("The `append` operation has been renamed `lazyAppendedAll`", 
"2.13.0")
      inline final def append[B >: A](rest: => IterableOnce[B]): Stream[B] = 
lazyAppendedAll(rest)
    ```
    - `object Stream`
    
    ```scala
    deprecated("Use LazyList (which is fully lazy) instead of Stream (which has 
a lazy tail only)", "2.13.0")
    SerialVersionUID(3L)
    object Stream extends SeqFactory[Stream] {
    ```
    - `type Stream` and value Stream
    
    ```scala
    deprecated("Use LazyList instead of Stream", "2.13.0")
    type Stream[+A] = scala.collection.immutable.Stream[A]
    deprecated("Use LazyList instead of Stream", "2.13.0")
    val Stream = scala.collection.immutable.Stream
    ```
    - method `toStream` in trait `IterableOnceOps`
    
    ```scala
    deprecated("Use .to(LazyList) instead of .toStream", "2.13.0")
    inline final def toStream: immutable.Stream[A] = to(immutable.Stream)
    ```
    
    ### Why are the changes needed?
    Clean up deprecated Scala API usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Acitons
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43563 from LuciferYang/stream-2-lazylist.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../input/WholeTextFileInputFormatSuite.scala      |  2 +-
 .../input/WholeTextFileRecordReaderSuite.scala     |  2 +-
 .../spark/storage/FlatmapIteratorSuite.scala       |  6 ++--
 .../expressions/collectionOperations.scala         |  2 +-
 .../plans/AliasAwareOutputExpression.scala         |  4 +--
 .../spark/sql/catalyst/plans/QueryPlan.scala       |  2 +-
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 42 +++++++++++-----------
 .../sql/catalyst/plans/LogicalPlanSuite.scala      |  4 +--
 .../spark/sql/catalyst/trees/TreeNodeSuite.scala   | 36 +++++++++----------
 .../spark/sql/RelationalGroupedDataset.scala       |  4 +--
 .../sql/execution/AliasAwareOutputExpression.scala |  2 +-
 .../sql/execution/WholeStageCodegenExec.scala      |  2 +-
 .../execution/joins/BroadcastHashJoinExec.scala    |  2 +-
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  2 +-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  |  2 +-
 .../scala/org/apache/spark/sql/GenTPCDSData.scala  | 13 ++++---
 .../apache/spark/sql/GeneratorFunctionSuite.scala  |  2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala     |  4 +--
 .../apache/spark/sql/execution/PlannerSuite.scala  |  2 +-
 .../org/apache/spark/sql/execution/SortSuite.scala |  2 +-
 .../sql/execution/WholeStageCodegenSuite.scala     |  4 +--
 21 files changed, 70 insertions(+), 71 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
 
b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
index 417e711e9c0..26c1be259ad 100644
--- 
a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
@@ -83,6 +83,6 @@ object WholeTextFileInputFormatSuite {
   private val fileLengths = Array(10, 100, 1000)
 
   private val files = fileLengths.zip(fileNames).map { case (upperBound, 
filename) =>
-    filename -> 
Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+    filename -> 
LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray
   }.toMap
 }
diff --git 
a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
 
b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index c833d22b3be..e64ebe2a551 100644
--- 
a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -145,6 +145,6 @@ object WholeTextFileRecordReaderSuite {
   private val fileLengths = Array(10, 100, 1000)
 
   private val files = fileLengths.zip(fileNames).map { case (upperBound, 
filename) =>
-    filename -> 
Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+    filename -> 
LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray
   }.toMap
 }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
index e719c722d01..cb59766b08a 100644
--- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
@@ -36,7 +36,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with 
LocalSparkContext {
     sc = new SparkContext(sconf)
     val expand_size = 100
     val data = sc.parallelize((1 to 5).toSeq).
-      flatMap( x => Stream.range(0, expand_size))
+      flatMap( x => LazyList.range(0, expand_size))
     val persisted = data.persist(StorageLevel.DISK_ONLY)
     assert(persisted.count()===500)
     assert(persisted.filter(_==1).count()===5)
@@ -47,7 +47,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with 
LocalSparkContext {
     sc = new SparkContext(sconf)
     val expand_size = 100
     val data = sc.parallelize((1 to 5).toSeq).
-      flatMap(x => Stream.range(0, expand_size))
+      flatMap(x => LazyList.range(0, expand_size))
     val persisted = data.persist(StorageLevel.MEMORY_ONLY)
     assert(persisted.count()===500)
     assert(persisted.filter(_==1).count()===5)
@@ -59,7 +59,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with 
LocalSparkContext {
     sc = new SparkContext(sconf)
     val expand_size = 500
     val data = sc.parallelize(Seq(1, 2)).
-      flatMap(x => Stream.range(1, expand_size).
+      flatMap(x => LazyList.range(1, expand_size).
       map(y => "%d: string test %d".format(y, x)))
     val persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
     assert(persisted.filter(_.startsWith("1:")).count()===2)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 0a080423b10..e45f0d72a5c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2650,7 +2650,7 @@ case class Concat(children: Seq[Expression]) extends 
ComplexTypeMergingExpressio
       }
     case ArrayType(elementType, _) =>
       input => {
-        val inputs = children.toStream.map(_.eval(input))
+        val inputs = children.to(LazyList).map(_.eval(input))
         if (inputs.contains(null)) {
           null
         } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
index 2cca7b844cc..e1a9e8b5ea8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -67,7 +67,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
   /**
    * Return a stream of expressions in which the original expression is 
projected with `aliasMap`.
    */
-  protected def projectExpression(expr: Expression): Stream[Expression] = {
+  protected def projectExpression(expr: Expression): LazyList[Expression] = {
     val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
     expr.multiTransformDown {
       // Mapping with aliases
@@ -103,7 +103,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
       // but if only `b AS y` can be projected we can't return 
`Seq(SortOrder(y))`.
       orderingExpressions.iterator.map { sortOrder =>
         val orderingSet = mutable.Set.empty[Expression]
-        val sameOrderings = sortOrder.children.toStream
+        val sameOrderings = sortOrder.children.to(LazyList)
           .flatMap(projectExpression)
           .filter(e => orderingSet.add(e.canonicalized))
           .take(aliasCandidateLimit)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 36bdc4c5b02..c58a1c4c0f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -220,7 +220,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
       case Some(value) => Some(recursiveTransform(value))
       case m: Map[_, _] => m
       case d: DataType => d // Avoid unpacking Structs
-      case stream: Stream[_] => stream.map(recursiveTransform).force
+      case stream: LazyList[_] => stream.map(recursiveTransform).force
       case seq: Iterable[_] => seq.map(recursiveTransform)
       case other: AnyRef => other
       case null => null
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 0e5dd301953..99dbda4027f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -356,8 +356,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
     val newArgs = mapProductIterator {
       case s: StructType => s // Don't convert struct types to some other type 
of Seq[StructField]
       // Handle Seq[TreeNode] in TreeNode parameters.
-      case s: Stream[_] =>
-        // Stream is lazy so we need to force materialization
+      case s: LazyList[_] =>
+        // LazyList is lazy so we need to force materialization
         s.map(mapChild).force
       case s: Seq[_] =>
         s.map(mapChild)
@@ -557,7 +557,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    * @return     the stream of alternatives
    */
   def multiTransformDown(
-      rule: PartialFunction[BaseType, Seq[BaseType]]): Stream[BaseType] = {
+      rule: PartialFunction[BaseType, Seq[BaseType]]): LazyList[BaseType] = {
     multiTransformDownWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule)
   }
 
@@ -567,11 +567,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    *
    * As it is very easy to generate enormous number of alternatives when the 
input tree is huge or
    * when the rule returns many alternatives for many nodes, this function 
returns the alternatives
-   * as a lazy `Stream` to be able to limit the number of alternatives 
generated at the caller side
-   * as needed.
+   * as a lazy `LazyList` to be able to limit the number of alternatives 
generated at the caller
+   * side as needed.
    *
    * The purpose of this function to access the returned alternatives by the 
rule only if they are
-   * needed so the rule can return a `Stream` whose elements are also lazily 
calculated.
+   * needed so the rule can return a `LazyList` whose elements are also lazily 
calculated.
    * E.g. `multiTransform*` calls can be nested with the help of
    * `MultiTransform.generateCartesianProduct()`.
    *
@@ -579,7 +579,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    * the original node without any transformation is a valid alternative.
    *
    * The rule can return `Seq.empty` to indicate that the original node should 
be pruned. In this
-   * case `multiTransform()` returns an empty `Stream`.
+   * case `multiTransform()` returns an empty `LazyList`.
    *
    * Please consider the following examples of 
`input.multiTransformDown(rule)`:
    *
@@ -593,7 +593,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    *   `Add(a, b)` => `Seq(11, 12, 21, 22)`
    *
    * The output is:
-   *   `Stream(11, 12, 21, 22)`
+   *   `LazyList(11, 12, 21, 22)`
    *
    * 2.
    * In the previous example if we want to generate alternatives of `a` and 
`b` too then we need to
@@ -603,7 +603,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    *   `Add(a, b)` => `Seq(11, 12, 21, 22, Add(a, b))`
    *
    * The output is:
-   *   `Stream(11, 12, 21, 22, Add(1, 10), Add(2, 10), Add(1, 20), Add(2, 20))`
+   *   `LazyList(11, 12, 21, 22, Add(1, 10), Add(2, 10), Add(1, 20), Add(2, 
20))`
    *
    * @param rule   a function used to generate alternatives for a node
    * @param cond   a Lambda expression to prune tree traversals. If 
`cond.apply` returns false
@@ -619,15 +619,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
   def multiTransformDownWithPruning(
       cond: TreePatternBits => Boolean,
       ruleId: RuleId = UnknownRuleId
-    )(rule: PartialFunction[BaseType, Seq[BaseType]]): Stream[BaseType] = {
+    )(rule: PartialFunction[BaseType, Seq[BaseType]]): LazyList[BaseType] = {
     if (!cond.apply(this) || isRuleIneffective(ruleId)) {
-      return Stream(this)
+      return LazyList(this)
     }
 
     // We could return `Seq(this)` if the `rule` doesn't apply and handle both
     // - the doesn't apply
     // - and the rule returns a one element `Seq(originalNode)`
-    // cases together. The returned `Seq` can be a `Stream` and unfortunately 
it doesn't seem like
+    // cases together. The returned `Seq` can be a `LazyList` and 
unfortunately it doesn't seem like
     // there is a way to match on a one element stream without eagerly 
computing the tail's head.
     // This contradicts with the purpose of only taking the necessary elements 
from the
     // alternatives. I.e. the "multiTransformDown is lazy" test case in 
`TreeNodeSuite` would fail.
@@ -641,18 +641,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       })
     }
 
-    val afterRulesStream = if (afterRules.isEmpty) {
+    val afterRulesLazyList = if (afterRules.isEmpty) {
       if (ruleApplied) {
         // If the rule returned with empty alternatives then prune
-        Stream.empty
+        LazyList.empty
       } else {
         // If the rule was not applied then keep the original node
         this.markRuleAsIneffective(ruleId)
-        Stream(this)
+        LazyList(this)
       }
     } else {
       // If the rule was applied then use the returned alternatives
-      afterRules.toStream.map { afterRule =>
+      afterRules.to(LazyList).map { afterRule =>
         if (this fastEquals afterRule) {
           this
         } else {
@@ -662,13 +662,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       }
     }
 
-    afterRulesStream.flatMap { afterRule =>
+    afterRulesLazyList.flatMap { afterRule =>
       if (afterRule.containsChild.nonEmpty) {
         MultiTransform.generateCartesianProduct(
             afterRule.children.map(c => () => 
c.multiTransformDownWithPruning(cond, ruleId)(rule)))
           .map(afterRule.withNewChildren)
       } else {
-        Stream(afterRule)
+        LazyList(afterRule)
       }
     }
   }
@@ -792,7 +792,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
         case other => other
       }.view.force.toMap // `mapValues` is lazy and we need to force it to 
materialize
       case d: DataType => d // Avoid unpacking Structs
-      case args: Stream[_] => args.map(mapChild).force // Force 
materialization on stream
+      case args: LazyList[_] => args.map(mapChild).force // Force 
materialization on stream
       case args: Iterable[_] => args.map(mapChild)
       case nonChild: AnyRef => nonChild
       case null => null
@@ -1321,8 +1321,8 @@ object MultiTransform {
    * @param elementSeqs a list of sequences to build the cartesian product from
    * @return            the stream of generated `Seq` elements
    */
-  def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]): 
Stream[Seq[T]] = {
-    elementSeqs.foldRight(Stream(Seq.empty[T]))((elements, elementTails) =>
+  def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]): 
LazyList[Seq[T]] = {
+    elementSeqs.foldRight(LazyList(Seq.empty[T]))((elements, elementTails) =>
       for {
         elementTail <- elementTails
         element <- elements()
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index 8266d30d055..ea0fcac881c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -84,7 +84,7 @@ class LogicalPlanSuite extends SparkFunSuite {
   test("transformExpressions works with a Stream") {
     val id1 = NamedExpression.newExprId
     val id2 = NamedExpression.newExprId
-    val plan = Project(Stream(
+    val plan = Project(LazyList(
       Alias(Literal(1), "a")(exprId = id1),
       Alias(Literal(2), "b")(exprId = id2)),
       OneRowRelation())
@@ -92,7 +92,7 @@ class LogicalPlanSuite extends SparkFunSuite {
       case Literal(v: Int, IntegerType) if v != 1 =>
         Literal(v + 1, IntegerType)
     }
-    val expected = Project(Stream(
+    val expected = Project(LazyList(
       Alias(Literal(1), "a")(exprId = id1),
       Alias(Literal(3), "b")(exprId = id2)),
       OneRowRelation())
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 33ba4f05972..4dbadef93a0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -693,7 +693,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("transform works on stream of children") {
-    val before = Coalesce(Stream(Literal(1), Literal(2)))
+    val before = Coalesce(LazyList(Literal(1), Literal(2)))
     // Note it is a bit tricky to exhibit the broken behavior. Basically we 
want to create the
     // situation in which the TreeNode.mapChildren function's change detection 
is not triggered. A
     // stream's first element is typically materialized, so in order to not 
trip the TreeNode change
@@ -702,14 +702,14 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
       case Literal(v: Int, IntegerType) if v != 1 =>
         Literal(v + 1, IntegerType)
     }
-    val expected = Coalesce(Stream(Literal(1), Literal(3)))
+    val expected = Coalesce(LazyList(Literal(1), Literal(3)))
     assert(result === expected)
   }
 
   test("withNewChildren on stream of children") {
-    val before = Coalesce(Stream(Literal(1), Literal(2)))
-    val result = before.withNewChildren(Stream(Literal(1), Literal(3)))
-    val expected = Coalesce(Stream(Literal(1), Literal(3)))
+    val before = Coalesce(LazyList(Literal(1), Literal(2)))
+    val result = before.withNewChildren(LazyList(Literal(1), Literal(3)))
+    val expected = Coalesce(LazyList(Literal(1), Literal(3)))
     assert(result === expected)
   }
 
@@ -949,9 +949,9 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
     }
   }
 
-  private def newErrorAfterStream(es: Expression*) = {
-    es.toStream.append(
-      throw new NoSuchElementException("Stream should not return more 
elements")
+  private def newErrorAfterLazyList(es: Expression*) = {
+    es.to(LazyList).lazyAppendedAll(
+      throw new NoSuchElementException("LazyList should not return more 
elements")
     )
   }
 
@@ -975,8 +975,8 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
     val e = Add(Add(Literal("a"), Literal("b")), Add(Literal("c"), 
Literal("d")))
     val transformed = e.multiTransformDown {
       case StringLiteral("a") => Seq(Literal(1), Literal(2), Literal(3))
-      case StringLiteral("b") => newErrorAfterStream(Literal(10))
-      case Add(StringLiteral("c"), StringLiteral("d"), _) => 
newErrorAfterStream(Literal(100))
+      case StringLiteral("b") => newErrorAfterLazyList(Literal(10))
+      case Add(StringLiteral("c"), StringLiteral("d"), _) => 
newErrorAfterLazyList(Literal(100))
     }
     val expected = for {
       a <- Seq(Literal(1), Literal(2), Literal(3))
@@ -990,7 +990,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
     val transformed2 = e.multiTransformDown {
       case StringLiteral("a") => Seq(Literal(1), Literal(2), Literal(3))
       case StringLiteral("b") => Seq(Literal(10), Literal(20), Literal(30))
-      case Add(StringLiteral("c"), StringLiteral("d"), _) => 
newErrorAfterStream(Literal(100))
+      case Add(StringLiteral("c"), StringLiteral("d"), _) => 
newErrorAfterLazyList(Literal(100))
     }
     val expected2 = for {
       b <- Seq(Literal(10), Literal(20), Literal(30))
@@ -1055,7 +1055,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
   test("multiTransformDown alternatives are generated only if needed") {
     val e = Add(Add(Literal("a"), Literal("b")), Add(Literal("c"), 
Literal("d")))
     val transformed = e.multiTransformDown {
-      case StringLiteral("a") => newErrorAfterStream()
+      case StringLiteral("a") => newErrorAfterLazyList()
       case StringLiteral("b") => Seq.empty
     }
     assert(transformed.isEmpty)
@@ -1075,10 +1075,10 @@ class TreeNodeSuite extends SparkFunSuite with 
SQLHelper {
         a_or_b.getOrElse(
           // Besides returning the alternatives for the first encounter, also 
set up a mechanism to
           // update the cache when the new alternatives are requested.
-          Stream(Literal(1), Literal(2)).map { x =>
+          LazyList(Literal(1), Literal(2)).map { x =>
             a_or_b = Some(Seq(x))
             x
-          }.append {
+          }.lazyAppendedAll {
             a_or_b = None
             Seq.empty
           })
@@ -1094,19 +1094,19 @@ class TreeNodeSuite extends SparkFunSuite with 
SQLHelper {
     val transformed2 = e.multiTransformDown {
       case StringLiteral("a") | StringLiteral("b") =>
         a_or_b.getOrElse(
-          Stream(Literal(1), Literal(2)).map { x =>
+          LazyList(Literal(1), Literal(2)).map { x =>
             a_or_b = Some(Seq(x))
             x
-          }.append {
+          }.lazyAppendedAll {
             a_or_b = None
             Seq.empty
           })
       case StringLiteral("c") | StringLiteral("d") =>
         c_or_d.getOrElse(
-          Stream(Literal(10), Literal(20)).map { x =>
+          LazyList(Literal(10), Literal(20)).map { x =>
             c_or_d = Some(Seq(x))
             x
-          }.append {
+          }.lazyAppendedAll {
             c_or_d = None
             Seq.empty
           })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 27093ced751..7e15c0baf52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -59,8 +59,8 @@ class RelationalGroupedDataset protected[sql](
   private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
     val aggregates = if 
(df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
       groupingExprs match {
-        // call `toList` because `Stream` can't serialize in scala 2.13
-        case s: Stream[Expression] => s.toList ++ aggExprs
+        // call `toList` because `LazyList` can't serialize in scala 2.13
+        case s: LazyList[Expression] => s.toList ++ aggExprs
         case other => other ++ aggExprs
       }
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
index e1dcab80af3..67d2849e005 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
@@ -42,7 +42,7 @@ trait PartitioningPreservingUnaryExecNode extends 
UnaryExecNode
           projectExpression(e)
             .filter(e => partitioningSet.add(e.canonicalized))
             .take(aliasCandidateLimit)
-            .asInstanceOf[Stream[Partitioning]]
+            .asInstanceOf[LazyList[Partitioning]]
         case o => Seq(o)
       }
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 40de623f73d..0aefb0649d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -164,7 +164,7 @@ trait CodegenSupport extends SparkPlan {
       }
 
     val inputVars = inputVarsCandidate match {
-      case stream: Stream[ExprCode] => stream.force
+      case stream: LazyList[ExprCode] => stream.force
       case other => other
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 9f9f8743146..68022757ff2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -115,7 +115,7 @@ case class BroadcastHashJoinExec(
     PartitioningCollection(partitioning.multiTransformDown {
       case e: Expression if 
streamedKeyToBuildKeyMapping.contains(e.canonicalized) =>
         e +: streamedKeyToBuildKeyMapping(e.canonicalized)
-    }.asInstanceOf[Stream[HashPartitioning]]
+    }.asInstanceOf[LazyList[HashPartitioning]]
       .take(conf.broadcastHashJoinOutputPartitioningExpandLimit))
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index ccfdddd039d..f34d7cf3680 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1617,7 +1617,7 @@ class DataFrameAggregateSuite extends QueryTest
   }
 
   test("SPARK-38221: group by stream of complex expressions should not fail") {
-    val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): 
_*).sum("id")
+    val df = Seq(1).toDF("id").groupBy(LazyList($"id" + 1, $"id" + 2): 
_*).sum("id")
     checkAnswer(df, Row(2, 3, 1))
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9454acba87b..84133eb485f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1232,7 +1232,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     ).toDF("a", "b", "c")
 
     val w = Window.partitionBy("a").orderBy("b")
-    val selectExprs = Stream(
+    val selectExprs = LazyList(
       sum("c").over(w.rowsBetween(Window.unboundedPreceding, 
Window.currentRow)).as("sumc"),
       avg("c").over(w.rowsBetween(Window.unboundedPreceding, 
Window.currentRow)).as("avgc")
     )
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
index 8556ccf74b7..6768c5fd07b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
 
 import java.util.concurrent.LinkedBlockingQueue
 
-import scala.collection.immutable.Stream
 import scala.sys.process._
 import scala.util.Try
 
@@ -46,7 +45,7 @@ object BlockingLineStream {
   private final class BlockingStreamed[T](
     val process: T => Unit,
     val done: Int => Unit,
-    val stream: () => Stream[T])
+    val stream: () => LazyList[T])
 
   // See scala.sys.process.Streamed
   private object BlockingStreamed {
@@ -57,11 +56,11 @@ object BlockingLineStream {
     def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
       val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
 
-      def next(): Stream[T] = q.take match {
-        case Left(0) => Stream.empty
+      def next(): LazyList[T] = q.take match {
+        case Left(0) => LazyList.empty
         case Left(code) =>
-          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) 
else Stream.empty
-        case Right(s) => Stream.cons(s, next())
+          if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) 
else LazyList.empty
+        case Right(s) => LazyList.cons(s, next())
       }
 
       new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), 
() => next())
@@ -79,7 +78,7 @@ object BlockingLineStream {
     }
   }
 
-  def apply(command: Seq[String]): Stream[String] = {
+  def apply(command: Seq[String]): LazyList[String] = {
     val streamed = BlockingStreamed[String](true)
     val process = command.run(BasicIO(false, streamed.process, None))
     Spawn(streamed.done(process.exitValue()))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index afc152e072a..9b4ad768818 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -457,7 +457,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSparkSession {
   test("SPARK-38528: generator in stream of aggregate expressions") {
     val df = Seq(1, 2, 3).toDF("v")
     checkAnswer(
-      df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
+      df.select(LazyList(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
       Row(1, 6) :: Row(3, 6) :: Nil)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8c293bab641..c41b85f75e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1714,7 +1714,7 @@ class JoinSuite extends QueryTest with SharedSparkSession 
with AdaptiveSparkPlan
     val dsA = Seq((1, "a")).toDF("id", "c1")
     val dsB = Seq((2, "b")).toDF("id", "c2")
     val dsC = Seq((3, "c")).toDF("id", "c3")
-    val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, 
Stream("id"), "full_outer")
+    val joined = dsA.join(dsB, LazyList("id"), "full_outer").join(dsC, 
LazyList("id"), "full_outer")
 
     val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), 
Row(3, null, null, "c"))
 
@@ -1723,7 +1723,7 @@ class JoinSuite extends QueryTest with SharedSparkSession 
with AdaptiveSparkPlan
 
   test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with 
invalid access") {
     val ds = Seq((1, "a")).toDF("id", "c1")
-    val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, 
Stream("id"), "full_outer")
+    val joined = ds.join(ds, LazyList("id"), "full_outer").join(ds, 
LazyList("id"), "full_outer")
 
     val expected = Seq(Row(1, "a", "a", "a"))
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index de24b8c82b0..c5b1e68fb91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -743,7 +743,7 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
   }
 
   test("SPARK-24500: create union with stream of children") {
-    val df = Union(Stream(
+    val df = Union(LazyList(
       Range(1, 1, 1, 1),
       Range(1, 2, 1, 1)))
     df.queryExecution.executedPlan.execute()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 03e56cb9532..4cb9ae7cbc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -108,7 +108,7 @@ class SortSuite extends SparkPlanTest with 
SharedSparkSession {
     )
     checkAnswer(
       input.toDF("a", "b", "c"),
-      (child: SparkPlan) => SortExec(Stream($"a".asc, $"b".asc, $"c".asc),
+      (child: SparkPlan) => SortExec(LazyList($"a".asc, $"b".asc, $"c".asc),
         global = true, child = child),
       input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple),
       sortAnswers = false)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 08db88b0224..146a583d773 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -776,14 +776,14 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     val b = Seq((1, "a")).toDF("key", "value")
     val c = Seq(1).toDF("key")
 
-    val ab = a.join(b, Stream("key"), "left")
+    val ab = a.join(b, LazyList("key"), "left")
     val abc = ab.join(c, Seq("key"), "left")
 
     checkAnswer(abc, Row(1, "a"))
   }
 
   test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
-    val groupByCols = Stream(col("key"))
+    val groupByCols = LazyList(col("key"))
     val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
       .groupBy(groupByCols: _*)
       .max("value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to