This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b92265a98f2 [SPARK-45685][CORE][SQL] Use `LazyList` instead of `Stream` b92265a98f2 is described below commit b92265a98f241b333467a02f4fffc9889ad3e7da Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Oct 29 22:43:54 2023 -0700 [SPARK-45685][CORE][SQL] Use `LazyList` instead of `Stream` ### What changes were proposed in this pull request? This pr change to use `LazyList` instead of `Stream` due to `Stream` has been marked as deprecated after Scala 2.13.0. - `class Stream` ```scala deprecated("Use LazyList (which is fully lazy) instead of Stream (which has a lazy tail only)", "2.13.0") SerialVersionUID(3L) sealed abstract class Stream[+A] extends AbstractSeq[A] with LinearSeq[A] with LinearSeqOps[A, Stream, Stream[A]] with IterableFactoryDefaults[A, Stream] with Serializable { ... deprecated("The `append` operation has been renamed `lazyAppendedAll`", "2.13.0") inline final def append[B >: A](rest: => IterableOnce[B]): Stream[B] = lazyAppendedAll(rest) ``` - `object Stream` ```scala deprecated("Use LazyList (which is fully lazy) instead of Stream (which has a lazy tail only)", "2.13.0") SerialVersionUID(3L) object Stream extends SeqFactory[Stream] { ``` - `type Stream` and value Stream ```scala deprecated("Use LazyList instead of Stream", "2.13.0") type Stream[+A] = scala.collection.immutable.Stream[A] deprecated("Use LazyList instead of Stream", "2.13.0") val Stream = scala.collection.immutable.Stream ``` - method `toStream` in trait `IterableOnceOps` ```scala deprecated("Use .to(LazyList) instead of .toStream", "2.13.0") inline final def toStream: immutable.Stream[A] = to(immutable.Stream) ``` ### Why are the changes needed? Clean up deprecated Scala API usage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Acitons ### Was this patch authored or co-authored using generative AI tooling? No Closes #43563 from LuciferYang/stream-2-lazylist. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../input/WholeTextFileInputFormatSuite.scala | 2 +- .../input/WholeTextFileRecordReaderSuite.scala | 2 +- .../spark/storage/FlatmapIteratorSuite.scala | 6 ++-- .../expressions/collectionOperations.scala | 2 +- .../plans/AliasAwareOutputExpression.scala | 4 +-- .../spark/sql/catalyst/plans/QueryPlan.scala | 2 +- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 42 +++++++++++----------- .../sql/catalyst/plans/LogicalPlanSuite.scala | 4 +-- .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 36 +++++++++---------- .../spark/sql/RelationalGroupedDataset.scala | 4 +-- .../sql/execution/AliasAwareOutputExpression.scala | 2 +- .../sql/execution/WholeStageCodegenExec.scala | 2 +- .../execution/joins/BroadcastHashJoinExec.scala | 2 +- .../apache/spark/sql/DataFrameAggregateSuite.scala | 2 +- .../spark/sql/DataFrameWindowFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/GenTPCDSData.scala | 13 ++++--- .../apache/spark/sql/GeneratorFunctionSuite.scala | 2 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 4 +-- .../apache/spark/sql/execution/PlannerSuite.scala | 2 +- .../org/apache/spark/sql/execution/SortSuite.scala | 2 +- .../sql/execution/WholeStageCodegenSuite.scala | 4 +-- 21 files changed, 70 insertions(+), 71 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala index 417e711e9c0..26c1be259ad 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -83,6 +83,6 @@ object WholeTextFileInputFormatSuite { private val fileLengths = Array(10, 100, 1000) private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) => - filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray + filename -> LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray }.toMap } diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index c833d22b3be..e64ebe2a551 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -145,6 +145,6 @@ object WholeTextFileRecordReaderSuite { private val fileLengths = Array(10, 100, 1000) private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) => - filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray + filename -> LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray }.toMap } diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala index e719c722d01..cb59766b08a 100644 --- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala @@ -36,7 +36,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(sconf) val expand_size = 100 val data = sc.parallelize((1 to 5).toSeq). - flatMap( x => Stream.range(0, expand_size)) + flatMap( x => LazyList.range(0, expand_size)) val persisted = data.persist(StorageLevel.DISK_ONLY) assert(persisted.count()===500) assert(persisted.filter(_==1).count()===5) @@ -47,7 +47,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(sconf) val expand_size = 100 val data = sc.parallelize((1 to 5).toSeq). - flatMap(x => Stream.range(0, expand_size)) + flatMap(x => LazyList.range(0, expand_size)) val persisted = data.persist(StorageLevel.MEMORY_ONLY) assert(persisted.count()===500) assert(persisted.filter(_==1).count()===5) @@ -59,7 +59,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(sconf) val expand_size = 500 val data = sc.parallelize(Seq(1, 2)). - flatMap(x => Stream.range(1, expand_size). + flatMap(x => LazyList.range(1, expand_size). map(y => "%d: string test %d".format(y, x))) val persisted = data.persist(StorageLevel.MEMORY_ONLY_SER) assert(persisted.filter(_.startsWith("1:")).count()===2) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 0a080423b10..e45f0d72a5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2650,7 +2650,7 @@ case class Concat(children: Seq[Expression]) extends ComplexTypeMergingExpressio } case ArrayType(elementType, _) => input => { - val inputs = children.toStream.map(_.eval(input)) + val inputs = children.to(LazyList).map(_.eval(input)) if (inputs.contains(null)) { null } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index 2cca7b844cc..e1a9e8b5ea8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -67,7 +67,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper { /** * Return a stream of expressions in which the original expression is projected with `aliasMap`. */ - protected def projectExpression(expr: Expression): Stream[Expression] = { + protected def projectExpression(expr: Expression): LazyList[Expression] = { val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) expr.multiTransformDown { // Mapping with aliases @@ -103,7 +103,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] // but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`. orderingExpressions.iterator.map { sortOrder => val orderingSet = mutable.Set.empty[Expression] - val sameOrderings = sortOrder.children.toStream + val sameOrderings = sortOrder.children.to(LazyList) .flatMap(projectExpression) .filter(e => orderingSet.add(e.canonicalized)) .take(aliasCandidateLimit) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 36bdc4c5b02..c58a1c4c0f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -220,7 +220,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] case Some(value) => Some(recursiveTransform(value)) case m: Map[_, _] => m case d: DataType => d // Avoid unpacking Structs - case stream: Stream[_] => stream.map(recursiveTransform).force + case stream: LazyList[_] => stream.map(recursiveTransform).force case seq: Iterable[_] => seq.map(recursiveTransform) case other: AnyRef => other case null => null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 0e5dd301953..99dbda4027f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -356,8 +356,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] val newArgs = mapProductIterator { case s: StructType => s // Don't convert struct types to some other type of Seq[StructField] // Handle Seq[TreeNode] in TreeNode parameters. - case s: Stream[_] => - // Stream is lazy so we need to force materialization + case s: LazyList[_] => + // LazyList is lazy so we need to force materialization s.map(mapChild).force case s: Seq[_] => s.map(mapChild) @@ -557,7 +557,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] * @return the stream of alternatives */ def multiTransformDown( - rule: PartialFunction[BaseType, Seq[BaseType]]): Stream[BaseType] = { + rule: PartialFunction[BaseType, Seq[BaseType]]): LazyList[BaseType] = { multiTransformDownWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule) } @@ -567,11 +567,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] * * As it is very easy to generate enormous number of alternatives when the input tree is huge or * when the rule returns many alternatives for many nodes, this function returns the alternatives - * as a lazy `Stream` to be able to limit the number of alternatives generated at the caller side - * as needed. + * as a lazy `LazyList` to be able to limit the number of alternatives generated at the caller + * side as needed. * * The purpose of this function to access the returned alternatives by the rule only if they are - * needed so the rule can return a `Stream` whose elements are also lazily calculated. + * needed so the rule can return a `LazyList` whose elements are also lazily calculated. * E.g. `multiTransform*` calls can be nested with the help of * `MultiTransform.generateCartesianProduct()`. * @@ -579,7 +579,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] * the original node without any transformation is a valid alternative. * * The rule can return `Seq.empty` to indicate that the original node should be pruned. In this - * case `multiTransform()` returns an empty `Stream`. + * case `multiTransform()` returns an empty `LazyList`. * * Please consider the following examples of `input.multiTransformDown(rule)`: * @@ -593,7 +593,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] * `Add(a, b)` => `Seq(11, 12, 21, 22)` * * The output is: - * `Stream(11, 12, 21, 22)` + * `LazyList(11, 12, 21, 22)` * * 2. * In the previous example if we want to generate alternatives of `a` and `b` too then we need to @@ -603,7 +603,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] * `Add(a, b)` => `Seq(11, 12, 21, 22, Add(a, b))` * * The output is: - * `Stream(11, 12, 21, 22, Add(1, 10), Add(2, 10), Add(1, 20), Add(2, 20))` + * `LazyList(11, 12, 21, 22, Add(1, 10), Add(2, 10), Add(1, 20), Add(2, 20))` * * @param rule a function used to generate alternatives for a node * @param cond a Lambda expression to prune tree traversals. If `cond.apply` returns false @@ -619,15 +619,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] def multiTransformDownWithPruning( cond: TreePatternBits => Boolean, ruleId: RuleId = UnknownRuleId - )(rule: PartialFunction[BaseType, Seq[BaseType]]): Stream[BaseType] = { + )(rule: PartialFunction[BaseType, Seq[BaseType]]): LazyList[BaseType] = { if (!cond.apply(this) || isRuleIneffective(ruleId)) { - return Stream(this) + return LazyList(this) } // We could return `Seq(this)` if the `rule` doesn't apply and handle both // - the doesn't apply // - and the rule returns a one element `Seq(originalNode)` - // cases together. The returned `Seq` can be a `Stream` and unfortunately it doesn't seem like + // cases together. The returned `Seq` can be a `LazyList` and unfortunately it doesn't seem like // there is a way to match on a one element stream without eagerly computing the tail's head. // This contradicts with the purpose of only taking the necessary elements from the // alternatives. I.e. the "multiTransformDown is lazy" test case in `TreeNodeSuite` would fail. @@ -641,18 +641,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] }) } - val afterRulesStream = if (afterRules.isEmpty) { + val afterRulesLazyList = if (afterRules.isEmpty) { if (ruleApplied) { // If the rule returned with empty alternatives then prune - Stream.empty + LazyList.empty } else { // If the rule was not applied then keep the original node this.markRuleAsIneffective(ruleId) - Stream(this) + LazyList(this) } } else { // If the rule was applied then use the returned alternatives - afterRules.toStream.map { afterRule => + afterRules.to(LazyList).map { afterRule => if (this fastEquals afterRule) { this } else { @@ -662,13 +662,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } } - afterRulesStream.flatMap { afterRule => + afterRulesLazyList.flatMap { afterRule => if (afterRule.containsChild.nonEmpty) { MultiTransform.generateCartesianProduct( afterRule.children.map(c => () => c.multiTransformDownWithPruning(cond, ruleId)(rule))) .map(afterRule.withNewChildren) } else { - Stream(afterRule) + LazyList(afterRule) } } } @@ -792,7 +792,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] case other => other }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize case d: DataType => d // Avoid unpacking Structs - case args: Stream[_] => args.map(mapChild).force // Force materialization on stream + case args: LazyList[_] => args.map(mapChild).force // Force materialization on stream case args: Iterable[_] => args.map(mapChild) case nonChild: AnyRef => nonChild case null => null @@ -1321,8 +1321,8 @@ object MultiTransform { * @param elementSeqs a list of sequences to build the cartesian product from * @return the stream of generated `Seq` elements */ - def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]): Stream[Seq[T]] = { - elementSeqs.foldRight(Stream(Seq.empty[T]))((elements, elementTails) => + def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]): LazyList[Seq[T]] = { + elementSeqs.foldRight(LazyList(Seq.empty[T]))((elements, elementTails) => for { elementTail <- elementTails element <- elements() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index 8266d30d055..ea0fcac881c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -84,7 +84,7 @@ class LogicalPlanSuite extends SparkFunSuite { test("transformExpressions works with a Stream") { val id1 = NamedExpression.newExprId val id2 = NamedExpression.newExprId - val plan = Project(Stream( + val plan = Project(LazyList( Alias(Literal(1), "a")(exprId = id1), Alias(Literal(2), "b")(exprId = id2)), OneRowRelation()) @@ -92,7 +92,7 @@ class LogicalPlanSuite extends SparkFunSuite { case Literal(v: Int, IntegerType) if v != 1 => Literal(v + 1, IntegerType) } - val expected = Project(Stream( + val expected = Project(LazyList( Alias(Literal(1), "a")(exprId = id1), Alias(Literal(3), "b")(exprId = id2)), OneRowRelation()) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 33ba4f05972..4dbadef93a0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -693,7 +693,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } test("transform works on stream of children") { - val before = Coalesce(Stream(Literal(1), Literal(2))) + val before = Coalesce(LazyList(Literal(1), Literal(2))) // Note it is a bit tricky to exhibit the broken behavior. Basically we want to create the // situation in which the TreeNode.mapChildren function's change detection is not triggered. A // stream's first element is typically materialized, so in order to not trip the TreeNode change @@ -702,14 +702,14 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { case Literal(v: Int, IntegerType) if v != 1 => Literal(v + 1, IntegerType) } - val expected = Coalesce(Stream(Literal(1), Literal(3))) + val expected = Coalesce(LazyList(Literal(1), Literal(3))) assert(result === expected) } test("withNewChildren on stream of children") { - val before = Coalesce(Stream(Literal(1), Literal(2))) - val result = before.withNewChildren(Stream(Literal(1), Literal(3))) - val expected = Coalesce(Stream(Literal(1), Literal(3))) + val before = Coalesce(LazyList(Literal(1), Literal(2))) + val result = before.withNewChildren(LazyList(Literal(1), Literal(3))) + val expected = Coalesce(LazyList(Literal(1), Literal(3))) assert(result === expected) } @@ -949,9 +949,9 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } } - private def newErrorAfterStream(es: Expression*) = { - es.toStream.append( - throw new NoSuchElementException("Stream should not return more elements") + private def newErrorAfterLazyList(es: Expression*) = { + es.to(LazyList).lazyAppendedAll( + throw new NoSuchElementException("LazyList should not return more elements") ) } @@ -975,8 +975,8 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { val e = Add(Add(Literal("a"), Literal("b")), Add(Literal("c"), Literal("d"))) val transformed = e.multiTransformDown { case StringLiteral("a") => Seq(Literal(1), Literal(2), Literal(3)) - case StringLiteral("b") => newErrorAfterStream(Literal(10)) - case Add(StringLiteral("c"), StringLiteral("d"), _) => newErrorAfterStream(Literal(100)) + case StringLiteral("b") => newErrorAfterLazyList(Literal(10)) + case Add(StringLiteral("c"), StringLiteral("d"), _) => newErrorAfterLazyList(Literal(100)) } val expected = for { a <- Seq(Literal(1), Literal(2), Literal(3)) @@ -990,7 +990,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { val transformed2 = e.multiTransformDown { case StringLiteral("a") => Seq(Literal(1), Literal(2), Literal(3)) case StringLiteral("b") => Seq(Literal(10), Literal(20), Literal(30)) - case Add(StringLiteral("c"), StringLiteral("d"), _) => newErrorAfterStream(Literal(100)) + case Add(StringLiteral("c"), StringLiteral("d"), _) => newErrorAfterLazyList(Literal(100)) } val expected2 = for { b <- Seq(Literal(10), Literal(20), Literal(30)) @@ -1055,7 +1055,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { test("multiTransformDown alternatives are generated only if needed") { val e = Add(Add(Literal("a"), Literal("b")), Add(Literal("c"), Literal("d"))) val transformed = e.multiTransformDown { - case StringLiteral("a") => newErrorAfterStream() + case StringLiteral("a") => newErrorAfterLazyList() case StringLiteral("b") => Seq.empty } assert(transformed.isEmpty) @@ -1075,10 +1075,10 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { a_or_b.getOrElse( // Besides returning the alternatives for the first encounter, also set up a mechanism to // update the cache when the new alternatives are requested. - Stream(Literal(1), Literal(2)).map { x => + LazyList(Literal(1), Literal(2)).map { x => a_or_b = Some(Seq(x)) x - }.append { + }.lazyAppendedAll { a_or_b = None Seq.empty }) @@ -1094,19 +1094,19 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { val transformed2 = e.multiTransformDown { case StringLiteral("a") | StringLiteral("b") => a_or_b.getOrElse( - Stream(Literal(1), Literal(2)).map { x => + LazyList(Literal(1), Literal(2)).map { x => a_or_b = Some(Seq(x)) x - }.append { + }.lazyAppendedAll { a_or_b = None Seq.empty }) case StringLiteral("c") | StringLiteral("d") => c_or_d.getOrElse( - Stream(Literal(10), Literal(20)).map { x => + LazyList(Literal(10), Literal(20)).map { x => c_or_d = Some(Seq(x)) x - }.append { + }.lazyAppendedAll { c_or_d = None Seq.empty }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 27093ced751..7e15c0baf52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -59,8 +59,8 @@ class RelationalGroupedDataset protected[sql]( private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) { groupingExprs match { - // call `toList` because `Stream` can't serialize in scala 2.13 - case s: Stream[Expression] => s.toList ++ aggExprs + // call `toList` because `LazyList` can't serialize in scala 2.13 + case s: LazyList[Expression] => s.toList ++ aggExprs case other => other ++ aggExprs } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index e1dcab80af3..67d2849e005 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -42,7 +42,7 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode projectExpression(e) .filter(e => partitioningSet.add(e.canonicalized)) .take(aliasCandidateLimit) - .asInstanceOf[Stream[Partitioning]] + .asInstanceOf[LazyList[Partitioning]] case o => Seq(o) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 40de623f73d..0aefb0649d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -164,7 +164,7 @@ trait CodegenSupport extends SparkPlan { } val inputVars = inputVarsCandidate match { - case stream: Stream[ExprCode] => stream.force + case stream: LazyList[ExprCode] => stream.force case other => other } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 9f9f8743146..68022757ff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -115,7 +115,7 @@ case class BroadcastHashJoinExec( PartitioningCollection(partitioning.multiTransformDown { case e: Expression if streamedKeyToBuildKeyMapping.contains(e.canonicalized) => e +: streamedKeyToBuildKeyMapping(e.canonicalized) - }.asInstanceOf[Stream[HashPartitioning]] + }.asInstanceOf[LazyList[HashPartitioning]] .take(conf.broadcastHashJoinOutputPartitioningExpandLimit)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index ccfdddd039d..f34d7cf3680 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1617,7 +1617,7 @@ class DataFrameAggregateSuite extends QueryTest } test("SPARK-38221: group by stream of complex expressions should not fail") { - val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id") + val df = Seq(1).toDF("id").groupBy(LazyList($"id" + 1, $"id" + 2): _*).sum("id") checkAnswer(df, Row(2, 3, 1)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 9454acba87b..84133eb485f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1232,7 +1232,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest ).toDF("a", "b", "c") val w = Window.partitionBy("a").orderBy("b") - val selectExprs = Stream( + val selectExprs = LazyList( sum("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("sumc"), avg("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("avgc") ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala index 8556ccf74b7..6768c5fd07b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import java.util.concurrent.LinkedBlockingQueue -import scala.collection.immutable.Stream import scala.sys.process._ import scala.util.Try @@ -46,7 +45,7 @@ object BlockingLineStream { private final class BlockingStreamed[T]( val process: T => Unit, val done: Int => Unit, - val stream: () => Stream[T]) + val stream: () => LazyList[T]) // See scala.sys.process.Streamed private object BlockingStreamed { @@ -57,11 +56,11 @@ object BlockingLineStream { def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = { val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize) - def next(): Stream[T] = q.take match { - case Left(0) => Stream.empty + def next(): LazyList[T] = q.take match { + case Left(0) => LazyList.empty case Left(code) => - if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty - case Right(s) => Stream.cons(s, next()) + if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else LazyList.empty + case Right(s) => LazyList.cons(s, next()) } new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next()) @@ -79,7 +78,7 @@ object BlockingLineStream { } } - def apply(command: Seq[String]): Stream[String] = { + def apply(command: Seq[String]): LazyList[String] = { val streamed = BlockingStreamed[String](true) val process = command.run(BasicIO(false, streamed.process, None)) Spawn(streamed.done(process.exitValue())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index afc152e072a..9b4ad768818 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -457,7 +457,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { test("SPARK-38528: generator in stream of aggregate expressions") { val df = Seq(1, 2, 3).toDF("v") checkAnswer( - df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*), + df.select(LazyList(explode(array(min($"v"), max($"v"))), sum($"v")): _*), Row(1, 6) :: Row(3, 6) :: Nil) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 8c293bab641..c41b85f75e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1714,7 +1714,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val dsA = Seq((1, "a")).toDF("id", "c1") val dsB = Seq((2, "b")).toDF("id", "c2") val dsC = Seq((3, "c")).toDF("id", "c3") - val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, Stream("id"), "full_outer") + val joined = dsA.join(dsB, LazyList("id"), "full_outer").join(dsC, LazyList("id"), "full_outer") val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), Row(3, null, null, "c")) @@ -1723,7 +1723,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with invalid access") { val ds = Seq((1, "a")).toDF("id", "c1") - val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, Stream("id"), "full_outer") + val joined = ds.join(ds, LazyList("id"), "full_outer").join(ds, LazyList("id"), "full_outer") val expected = Seq(Row(1, "a", "a", "a")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index de24b8c82b0..c5b1e68fb91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -743,7 +743,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("SPARK-24500: create union with stream of children") { - val df = Union(Stream( + val df = Union(LazyList( Range(1, 1, 1, 1), Range(1, 2, 1, 1))) df.queryExecution.executedPlan.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 03e56cb9532..4cb9ae7cbc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -108,7 +108,7 @@ class SortSuite extends SparkPlanTest with SharedSparkSession { ) checkAnswer( input.toDF("a", "b", "c"), - (child: SparkPlan) => SortExec(Stream($"a".asc, $"b".asc, $"c".asc), + (child: SparkPlan) => SortExec(LazyList($"a".asc, $"b".asc, $"c".asc), global = true, child = child), input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple), sortAnswers = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 08db88b0224..146a583d773 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -776,14 +776,14 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession val b = Seq((1, "a")).toDF("key", "value") val c = Seq(1).toDF("key") - val ab = a.join(b, Stream("key"), "left") + val ab = a.join(b, LazyList("key"), "left") val abc = ab.join(c, Seq("key"), "left") checkAnswer(abc, Row(1, "a")) } test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") { - val groupByCols = Stream(col("key")) + val groupByCols = LazyList(col("key")) val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value") .groupBy(groupByCols: _*) .max("value") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org