This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 29eb577cefa
[SPARK-45701][SPARK-45684][SPARK-45692][CORE][SQL][SS][ML][K8S] Clean up the
deprecated API usage related to
`mutable.SetOps/c.SeqOps/Iterator/Iterable/IterableOps`
29eb577cefa is described below
commit 29eb577cefabd954f3de2c284a692790d621d0ba
Author: yangjie01 <[email protected]>
AuthorDate: Tue Oct 31 15:07:47 2023 +0800
[SPARK-45701][SPARK-45684][SPARK-45692][CORE][SQL][SS][ML][K8S] Clean up
the deprecated API usage related to
`mutable.SetOps/c.SeqOps/Iterator/Iterable/IterableOps`
### What changes were proposed in this pull request?
This pr clean up the deprecated API usage related to `SetOps`:
- `--` -> `diff`
- `-` -> `diff`
- `+` -> `union`
- `retain` -> `filterInPlace`
the changes are refer to
```scala
deprecated("Consider requiring an immutable Set", "2.13.0")
def -- (that: IterableOnce[A]): C = {
val toRemove = that.iterator.to(immutable.Set)
fromSpecific(view.filterNot(toRemove))
}
deprecated("Consider requiring an immutable Set or fall back to Set.diff",
"2.13.0")
def - (elem: A): C = diff(Set(elem))
deprecated("Consider requiring an immutable Set or fall back to
Set.union", "2.13.0")
def + (elem: A): C = fromSpecific(new View.Appended(this, elem))
deprecated("Use filterInPlace instead", "2.13.0")
inline final def retain(p: A => Boolean): Unit = filterInPlace(p)
```
This pr also clean up deprecated API usage related to `SeqOps`
- `transform` -> `mapInPlace`
- `reverseMap` -> `.reverseIterator.map(f).to(...)`
- `union` -> `concat`
the changes are refer to
```scala
deprecated("Use `mapInPlace` on an `IndexedSeq` instead", "2.13.0")
`inline`final def transform(f: A => A): this.type = {
var i = 0
val siz = size
while (i < siz) { this(i) = f(this(i)); i += 1 }
this
}
deprecated("Use .reverseIterator.map(f).to(...) instead of
.reverseMap(f)", "2.13.0")
def reverseMap[B](f: A => B): CC[B] = iterableFactory.from(new
View.Map(View.fromIteratorProvider(() => reverseIterator), f))
deprecated("Use `concat` instead", "2.13.0")
inline final def union[B >: A](that: Seq[B]): CC[B] = concat(that)
```
This pr also clean up deprecated API usage related to
`Iterator/Iterable/IterableOps` refer to
trait Iterable
- `toIterable` -> immutable.ArraySeq.unsafeWrapArray
```scala
deprecated("toIterable is internal and will be made protected; its name
is similar to `toList` or `toSeq`, but it doesn't copy non-immutable
collections", "2.13.7")
final def toIterable: this.type = this
```
- s.c.Iterator
- `.seq` -> removed
```scala
deprecated("Iterator.seq always returns the iterator itself", "2.13.0")
def seq: this.type = this
```
- s.c.IterableOps
- `toTraversable ` -> removed
```
deprecated("toTraversable is internal and will be made protected; its
name is similar to `toList` or `toSeq`, but it doesn't copy non-immutable
collections", "2.13.0")
final def toTraversable: Traversable[A] = toIterable
```
### Why are the changes needed?
Clean up deprecated Scala API usage
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43575 from LuciferYang/SPARK-45701.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../org/apache/spark/util/ClosureCleaner.scala | 4 +--
.../scala/org/apache/spark/sql/SparkSession.scala | 3 ++-
.../sql/KeyValueGroupedDatasetE2ETestSuite.scala | 2 +-
.../connect/client/GrpcExceptionConverter.scala | 3 ++-
.../org/apache/spark/sql/connect/dsl/package.scala | 4 +--
.../sql/connect/planner/SparkConnectPlanner.scala | 26 +++++++++---------
.../spark/sql/connect/utils/ErrorUtils.scala | 31 +++++++++++-----------
.../spark/storage/BlockReplicationPolicy.scala | 2 +-
.../collection/ExternalAppendOnlyMapSuite.scala | 6 ++---
.../ml/classification/LogisticRegression.scala | 2 +-
.../apache/spark/ml/feature/NormalizerSuite.scala | 4 +--
.../spark/ml/feature/StringIndexerSuite.scala | 4 +--
.../cluster/k8s/ExecutorPodsAllocator.scala | 2 +-
.../cluster/k8s/ExecutorPodsLifecycleManager.scala | 2 +-
.../sql/catalyst/expressions/AttributeSet.scala | 4 +--
.../catalyst/expressions/stringExpressions.scala | 2 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 2 +-
.../catalyst/rules/QueryExecutionMetering.scala | 2 +-
.../apache/spark/sql/execution/command/ddl.scala | 2 +-
.../spark/sql/execution/streaming/memory.scala | 2 +-
.../apache/spark/sql/streaming/StreamSuite.scala | 2 +-
21 files changed, 57 insertions(+), 54 deletions(-)
diff --git
a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index ffa2f0e60b2..5ea3c9afa9c 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -95,13 +95,13 @@ private[spark] object ClosureCleaner extends Logging {
if (cr != null) {
val set = Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
- for (cls <- set -- seen) {
+ for (cls <- set.diff(seen)) {
seen += cls
stack.push(cls)
}
}
}
- (seen - obj.getClass).toList
+ seen.diff(Set(obj.getClass)).toList
}
/** Initializes the accessed fields for outer classes and their super
classes. */
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 2d3e4205da9..969ac017ecb 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -21,6 +21,7 @@ import java.net.URI
import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe.TypeTag
@@ -247,7 +248,7 @@ class SparkSession private[sql] (
proto.SqlCommand
.newBuilder()
.setSql(sqlText)
- .addAllPosArguments(args.map(lit(_).expr).toIterable.asJava)))
+
.addAllPosArguments(immutable.ArraySeq.unsafeWrapArray(args.map(lit(_).expr)).asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
// .toBuffer forces that the iterator is consumed and closed
val responseSeq = client.execute(plan.build()).toBuffer.toSeq
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
index 98a947826e3..91516b0069b 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
@@ -198,7 +198,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest
with SQLHelper {
.groupByKey(v => v / 2)
val values = grouped
.cogroup(otherGrouped) { (k, it, otherIt) =>
- Seq(it.toSeq.size + otherIt.seq.size)
+ Seq(it.toSeq.size + otherIt.size)
}
.collectAsList()
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index f404cfd2e41..b2782442f4a 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client
import java.time.DateTimeException
+import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -371,7 +372,7 @@ private[client] object GrpcExceptionConverter {
FetchErrorDetailsResponse.Error
.newBuilder()
.setMessage(message)
- .addAllErrorTypeHierarchy(classes.toIterable.asJava)
+
.addAllErrorTypeHierarchy(immutable.ArraySeq.unsafeWrapArray(classes).asJava)
.build()))
}
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 24fa2324f66..5fd1a035385 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -520,7 +520,7 @@ package object dsl {
.setProject(
Project
.newBuilder()
- .addAllExpressions(exprs.toIterable.asJava)
+ .addAllExpressions(exprs.asJava)
.build())
.build()
}
@@ -533,7 +533,7 @@ package object dsl {
Project
.newBuilder()
.setInput(logicalPlan)
- .addAllExpressions(exprs.toIterable.asJava)
+ .addAllExpressions(exprs.asJava)
.build())
.build()
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 17c10e63301..ec57909ad14 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.connect.planner
+import scala.collection.immutable
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Try
@@ -3183,9 +3184,9 @@ class SparkConnectPlanner(
case StreamingQueryManagerCommand.CommandCase.ACTIVE =>
val active_queries = session.streams.active
respBuilder.getActiveBuilder.addAllActiveQueries(
- active_queries
- .map(query => buildStreamingQueryInstance(query))
- .toIterable
+ immutable.ArraySeq
+ .unsafeWrapArray(active_queries
+ .map(query => buildStreamingQueryInstance(query)))
.asJava)
case StreamingQueryManagerCommand.CommandCase.GET_QUERY =>
@@ -3264,16 +3265,15 @@ class SparkConnectPlanner(
.setGetResourcesCommandResult(
proto.GetResourcesCommandResult
.newBuilder()
- .putAllResources(
- session.sparkContext.resources.view
- .mapValues(resource =>
- proto.ResourceInformation
- .newBuilder()
- .setName(resource.name)
- .addAllAddresses(resource.addresses.toIterable.asJava)
- .build())
- .toMap
- .asJava)
+ .putAllResources(session.sparkContext.resources.view
+ .mapValues(resource =>
+ proto.ResourceInformation
+ .newBuilder()
+ .setName(resource.name)
+
.addAllAddresses(immutable.ArraySeq.unsafeWrapArray(resource.addresses).asJava)
+ .build())
+ .toMap
+ .asJava)
.build())
.build())
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index f49b81dda8d..741fa97f178 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.connect.utils
import java.util.UUID
import scala.annotation.tailrec
+import scala.collection.immutable
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
@@ -90,21 +91,21 @@ private[connect] object ErrorUtils extends Logging {
if (serverStackTraceEnabled) {
builder.addAllStackTrace(
- currentError.getStackTrace
- .map { stackTraceElement =>
- val stackTraceBuilder =
FetchErrorDetailsResponse.StackTraceElement
- .newBuilder()
- .setDeclaringClass(stackTraceElement.getClassName)
- .setMethodName(stackTraceElement.getMethodName)
- .setLineNumber(stackTraceElement.getLineNumber)
-
- if (stackTraceElement.getFileName != null) {
- stackTraceBuilder.setFileName(stackTraceElement.getFileName)
- }
-
- stackTraceBuilder.build()
- }
- .toIterable
+ immutable.ArraySeq
+ .unsafeWrapArray(currentError.getStackTrace
+ .map { stackTraceElement =>
+ val stackTraceBuilder =
FetchErrorDetailsResponse.StackTraceElement
+ .newBuilder()
+ .setDeclaringClass(stackTraceElement.getClassName)
+ .setMethodName(stackTraceElement.getMethodName)
+ .setLineNumber(stackTraceElement.getLineNumber)
+
+ if (stackTraceElement.getFileName != null) {
+ stackTraceBuilder.setFileName(stackTraceElement.getFileName)
+ }
+
+ stackTraceBuilder.build()
+ })
.asJava)
}
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
index 0bacc34cdfd..893b5605414 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
@@ -67,7 +67,7 @@ object BlockReplicationUtils {
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int])
{case (set, i) =>
val t = r.nextInt(i) + 1
- if (set.contains(t)) set + i else set + t
+ if (set.contains(t)) set.union(Set(i)) else set.union(Set(t))
}
indices.map(_ - 1).toList
}
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fb1502848ca..59f6e3f2d35 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -453,7 +453,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val first50Keys = for ( _ <- 0 until 50) yield {
val (k, vs) = it.next()
val sortedVs = vs.sorted
- assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+ assert(sortedVs == (0 until 10).map(10 * k + _))
k
}
assert(map.numSpills == 0)
@@ -474,7 +474,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val next50Keys = for ( _ <- 0 until 50) yield {
val (k, vs) = it.next()
val sortedVs = vs.sorted
- assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+ assert(sortedVs == (0 until 10).map(10 * k + _))
k
}
assert(!it.hasNext)
@@ -506,7 +506,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val keys = it.map{
case (k, vs) =>
val sortedVs = vs.sorted
- assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+ assert(sortedVs == (0 until 10).map(10 * k + _))
k
}
.toList
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 0efa57e56f1..8b796a65f4f 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -672,7 +672,7 @@ class LogisticRegression @Since("1.2.0") (
denseCoefficientMatrix.foreachActive { case (i, j, v) =>
centers(j) += v
}
- centers.transform(_ / numCoefficientSets)
+ centers.mapInPlace(_ / numCoefficientSets)
denseCoefficientMatrix.foreachActive { case (i, j, v) =>
denseCoefficientMatrix.update(i, j, v - centers(j))
}
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
index 1c602cd7d9a..7fa2ad5ee41 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
@@ -76,7 +76,7 @@ class NormalizerSuite extends MLTest with
DefaultReadWriteTest {
test("Normalization with default parameter") {
val normalizer = new
Normalizer().setInputCol("features").setOutputCol("normalized")
- val dataFrame: DataFrame = data.zip(l2Normalized).seq.toDF("features",
"expected")
+ val dataFrame: DataFrame = data.zip(l2Normalized).toDF("features",
"expected")
testTransformer[(Vector, Vector)](dataFrame, normalizer, "features",
"normalized", "expected") {
case Row(features: Vector, normalized: Vector, expected: Vector) =>
@@ -102,7 +102,7 @@ class NormalizerSuite extends MLTest with
DefaultReadWriteTest {
}
test("Normalization with setter") {
- val dataFrame: DataFrame = data.zip(l1Normalized).seq.toDF("features",
"expected")
+ val dataFrame: DataFrame = data.zip(l1Normalized).toDF("features",
"expected")
val normalizer = new
Normalizer().setInputCol("features").setOutputCol("normalized").setP(1)
testTransformer[(Vector, Vector)](dataFrame, normalizer, "features",
"normalized", "expected") {
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index c8247b9c8f3..99f12eab7d6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -92,7 +92,7 @@ class StringIndexerSuite extends MLTest with
DefaultReadWriteTest {
val attr = Attribute.fromStructField(rows.head.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("a", "c", "b"))
- assert(rows.seq === expected.collect().toSeq)
+ assert(rows === expected.collect().toSeq)
}
}
@@ -139,7 +139,7 @@ class StringIndexerSuite extends MLTest with
DefaultReadWriteTest {
val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attrSkip.values.get === Array("b", "a"))
- assert(rows.seq === expectedSkip.collect().toSeq)
+ assert(rows === expectedSkip.collect().toSeq)
}
indexer.setHandleInvalid("keep")
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index e678489e100..a4403fb96b2 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -277,7 +277,7 @@ class ExecutorPodsAllocator(
case _ => false
}.keySet
- val newFailedExecutorIds = currentFailedExecutorIds -- failedExecutorIds
+ val newFailedExecutorIds =
currentFailedExecutorIds.diff(failedExecutorIds)
if (newFailedExecutorIds.nonEmpty) {
logWarning(s"${newFailedExecutorIds.size} new failed executors.")
newFailedExecutorIds.foreach { _ =>
failureTracker.registerExecutorFailure() }
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 49bfde98bb8..46129f3c9ea 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -116,7 +116,7 @@ private[spark] class ExecutorPodsLifecycleManager(
// This makes sure that we don't keep growing that set indefinitely, in
case we end up missing
// an update for some pod.
if (inactivatedPods.nonEmpty && snapshots.nonEmpty) {
- inactivatedPods.retain(snapshots.last.executorPods.contains(_))
+ inactivatedPods.filterInPlace(snapshots.last.executorPods.contains(_))
}
// Reconcile the case where Spark claims to know about an executor but the
corresponding pod
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index b059e4f8496..2628afd8923 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -84,11 +84,11 @@ class AttributeSet private (private val baseSet:
mutable.LinkedHashSet[Attribute
/** Returns a new [[AttributeSet]] that contains `elem` in addition to the
current elements. */
def +(elem: Attribute): AttributeSet = // scalastyle:ignore
- new AttributeSet(baseSet + new AttributeEquals(elem))
+ new AttributeSet(baseSet.union(Set(new AttributeEquals(elem))))
/** Returns a new [[AttributeSet]] that does not contain `elem`. */
def -(elem: Attribute): AttributeSet =
- new AttributeSet(baseSet - new AttributeEquals(elem))
+ new AttributeSet(baseSet.diff(Set(new AttributeEquals(elem))))
/** Returns an iterator containing all of the attributes in the set. */
def iterator: Iterator[Attribute] = baseSet.map(_.a).iterator
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index eff63bf0341..563223bb33b 100755
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -2573,7 +2573,7 @@ object Decode {
default = search
}
}
- CaseWhen(branches.seq.toSeq, default)
+ CaseWhen(branches.toSeq, default)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d9d04db9ab0..6d70ad29f87 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1835,7 +1835,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
// Reverse the contexts to have them in the same sequence as in the SQL
statement & turn them
// into expressions.
- val expressions = contexts.reverseMap(expression)
+ val expressions = contexts.reverseIterator.map(expression).to(ArrayBuffer)
// Create a balanced tree.
def reduceToExpressionTree(low: Int, high: Int): Expression = high - low
match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index 4b7b0079f07..60d82d81df7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -86,7 +86,7 @@ case class QueryExecutionMetering() {
val colRunTime = "Effective Time / Total Time".padTo(len = 47, "
").mkString
val colNumRuns = "Effective Runs / Total Runs".padTo(len = 47, "
").mkString
- val ruleMetrics = map.toSeq.sortBy(_._2).reverseMap { case (name, time) =>
+ val ruleMetrics = map.toSeq.sortBy(_._2).reverseIterator.map { case (name,
time) =>
val timeEffectiveRun = timeEffectiveRunsMap.get(name)
val numRuns = numRunsMap.get(name)
val numEffectiveRun = numEffectiveRunsMap.get(name)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index a30734abfa7..9dc6d7aab8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -707,7 +707,7 @@ case class RepairTableCommand(
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(),
table.partitionColumnNames, threshold,
- spark.sessionState.conf.resolver, new
ForkJoinTaskSupport(evalPool)).seq
+ spark.sessionState.conf.resolver, new
ForkJoinTaskSupport(evalPool))
} finally {
evalPool.shutdown()
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 826543fd565..2ff478ef98e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -69,7 +69,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext:
SQLContext) extends Spa
}
def addData(data: A*): OffsetV2 = {
- addData(data.toTraversable)
+ addData(data)
}
def addData(data: IterableOnce[A]): OffsetV2
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index e54ce649736..2174e91cb44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -174,7 +174,7 @@ class StreamSuite extends StreamTest {
try {
query.processAllAvailable()
val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
- checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to
10L)).toArray: _*)
+ checkDatasetUnorderly[Long](outputDf, (0L to 10L).concat(0L to 10L):
_*)
} finally {
query.stop()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]