This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 309c65a [SPARK-38337][CORE][SQL][DSTREAM][MLLIB] Replace `toIterator`
with `iterator` for `IterableLike`/`IterableOnce` to cleanup deprecated api
usage
309c65a is described below
commit 309c65a353489b70bf5135a01d1f159328eaa3f3
Author: yangjie01 <[email protected]>
AuthorDate: Mon Feb 28 09:59:05 2022 +0900
[SPARK-38337][CORE][SQL][DSTREAM][MLLIB] Replace `toIterator` with
`iterator` for `IterableLike`/`IterableOnce` to cleanup deprecated api usage
### What changes were proposed in this pull request?
In Scala 2.12, `IterableLike.toIterator` identified as
`deprecatedOverriding`:
```scala
deprecatedOverriding("toIterator should stay consistent with iterator for
all Iterables: override iterator instead.", "2.11.0")
override def toIterator: Iterator[A] = iterator
```
In Scala 2.13, `IterableOnce.toIterator` identified as `deprecated`:
```scala
deprecated("Use .iterator instead of .toIterator", "2.13.0")
`inline` final def toIterator: Iterator[A] = iterator
```
This PR replaces `toIterator` with `iterator` as recommended by scaladoc as
above.
### Why are the changes needed?
Cleanup deprecated api usage
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA
Closes #35665 from LuciferYang/toIterator-is-deprecated.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 4 ++--
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +-
.../main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala | 4 ++--
.../main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala | 2 +-
core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +-
.../main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +-
.../test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala | 2 +-
.../org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala | 2 +-
.../org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala | 4 ++--
.../org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala | 2 +-
core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 2 +-
.../scala/org/apache/spark/ml/source/image/ImageFileFormat.scala | 2 +-
.../scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala | 2 +-
.../src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala | 2 +-
mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 4 ++--
.../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala | 2 +-
.../org/apache/spark/sql/catalyst/expressions/objects/objects.scala | 2 +-
.../scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala | 2 +-
.../scala/org/apache/spark/sql/catalyst/optimizer/objects.scala | 2 +-
.../org/apache/spark/sql/catalyst/util/FailureSafeParser.scala | 2 +-
.../scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala | 2 +-
.../main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala | 2 +-
.../test/scala/org/apache/spark/sql/catalyst/SQLKeywordSuite.scala | 2 +-
.../catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala | 2 +-
.../scala/org/apache/spark/sql/execution/command/commands.scala | 4 ++--
.../src/main/scala/org/apache/spark/sql/execution/command/ddl.scala | 4 ++--
.../org/apache/spark/sql/execution/datasources/FileScanRDD.scala | 2 +-
.../org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala | 2 +-
.../sql/execution/datasources/v2/FilePartitionReaderFactory.scala | 4 ++--
.../apache/spark/sql/execution/datasources/v2/V2CommandExec.scala | 2 +-
.../spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala | 2 +-
.../org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala | 4 ++--
.../org/apache/spark/sql/execution/datasources/FileIndexSuite.scala | 2 +-
.../org/apache/spark/sql/execution/joins/HashedRelationSuite.scala | 2 +-
.../main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala | 2 +-
.../main/scala/org/apache/spark/streaming/util/RawTextHelper.scala | 6 +++---
.../test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala | 4 ++--
.../test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 4 ++--
.../org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 4 ++--
39 files changed, 52 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 9835695..e6ed469 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1177,7 +1177,7 @@ private[spark] class MapOutputTrackerMaster(
override def getMapSizesForMergeResult(
shuffleId: Int,
partitionId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
= {
- Seq.empty.toIterator
+ Seq.empty.iterator
}
// This method is only called in local-mode. Since push based shuffle won't
be
@@ -1186,7 +1186,7 @@ private[spark] class MapOutputTrackerMaster(
shuffleId: Int,
partitionId: Int,
chunkTracker: RoaringBitmap): Iterator[(BlockManagerId, Seq[(BlockId,
Long, Int)])] = {
- Seq.empty.toIterator
+ Seq.empty.iterator
}
// This method is only called in local-mode.
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 6d4dc3d..6dc9e71 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -245,7 +245,7 @@ private[spark] object PythonRDD extends Logging {
out.writeInt(1)
// Write the next object and signal end of data for this iteration
- writeIteratorToStream(partitionArray.toIterator, out)
+ writeIteratorToStream(partitionArray.iterator, out)
out.writeInt(SpecialLengths.END_OF_DATA_SECTION)
out.flush()
} else {
diff --git
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 6143321..2b4d860 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -76,13 +76,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends
WebUIPage("") {
private def formatMasterResourcesInUse(aliveWorkers: Array[WorkerInfo]):
String = {
val totalInfo = aliveWorkers.map(_.resourcesInfo)
- .flatMap(_.toIterator)
+ .flatMap(_.iterator)
.groupBy(_._1) // group by resource name
.map { case (rName, rInfoArr) =>
rName -> rInfoArr.map(_._2.addresses.size).sum
}
val usedInfo = aliveWorkers.map(_.resourcesInfoUsed)
- .flatMap(_.toIterator)
+ .flatMap(_.iterator)
.groupBy(_._1) // group by resource name
.map { case (rName, rInfoArr) =>
rName -> rInfoArr.map(_._2.addresses.size).sum
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
index d20f3ed..f479e5e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala
@@ -117,7 +117,7 @@ private[scheduler] class TaskSetExcludelist(
// over the limit, exclude this task from the entire host.
val execsWithFailuresOnNode =
nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
execsWithFailuresOnNode += exec
- val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
+ val failuresOnHost = execsWithFailuresOnNode.iterator.flatMap { exec =>
execToFailures.get(exec).map { failures =>
// We count task attempts here, not the number of unique executors
with failures. This is
// because jobs are aborted based on the number task attempts; if we
counted unique
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 7ae57f7..abd0beb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1872,7 +1872,7 @@ private[spark] class BlockManager(
serializerManager.dataSerializeStream(
blockId,
out,
- elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
+ elements.iterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
diff --git
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 1d3543e..144d8cf 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -305,7 +305,7 @@ private[spark] class MemoryStore(
val unrolledIterator = if (valuesHolder.vector != null) {
valuesHolder.vector.iterator
} else {
- valuesHolder.arrayValues.toIterator
+ valuesHolder.arrayValues.iterator
}
Left(new PartiallyUnrolledIterator(
diff --git
a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
index 16a92f5..7875cbc 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
@@ -138,5 +138,5 @@ class RDDOperationScopeSuite extends SparkFunSuite with
BeforeAndAfter {
private class MyCoolRDD(sc: SparkContext) extends RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
- override def compute(p: Partition, context: TaskContext): Iterator[Int] = {
Nil.toIterator }
+ override def compute(p: Partition, context: TaskContext): Iterator[Int] = {
Nil.iterator }
}
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
index d964b28..56b8e0b 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
@@ -111,7 +111,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite
with LocalSparkContext
val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId)
(shuffleBlockId, byteOutputStream.size().toLong, mapId)
}
- Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).toIterator
+ Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).iterator
}
// Create a mocked shuffle handle to pass into HashShuffleReader.
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
index 6c13c7c..9e52b5e 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
@@ -103,7 +103,7 @@ class SortShuffleWriterSuite
mapId = 2,
context,
shuffleExecutorComponents)
- writer.write(records.toIterator)
+ writer.write(records.iterator)
writer.stop(success = true)
val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 2)
val writeMetrics = context.taskMetrics().shuffleWriteMetrics
@@ -160,7 +160,7 @@ class SortShuffleWriterSuite
context,
new LocalDiskShuffleExecutorComponents(
conf, shuffleBlockResolver._blockManager, shuffleBlockResolver))
- writer.write(records.toIterator)
+ writer.write(records.iterator)
val sorterMethod = PrivateMethod[ExternalSorter[_, _,
_]](Symbol("sorter"))
val sorter = writer.invokePrivate(sorterMethod())
val expectSpillSize = if (doSpill) records.size else 0
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index fdaf1f8..e6f0525 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -201,7 +201,7 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
transfer,
blockManager.getOrElse(createMockBlockManager()),
mapOutputTracker,
- blocksByAddress.toIterator,
+ blocksByAddress.iterator,
(_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in,
_)).getOrElse(in),
maxBytesInFlight,
maxReqsInFlight,
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 62cd819..973c098 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -464,7 +464,7 @@ class UtilsSuite extends SparkFunSuite with
ResetSystemProperties with Logging {
test("get iterator size") {
val empty = Seq[Int]()
- assert(Utils.getIteratorSize(empty.toIterator) === 0L)
+ assert(Utils.getIteratorSize(empty.iterator) === 0L)
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
index 868056b..0995df5 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
@@ -82,7 +82,7 @@ private[image] class ImageFileFormat extends FileFormat with
DataSourceRegister
}
val resultOpt = ImageSchema.decode(origin, bytes)
val filteredResult = if (imageSourceOptions.dropInvalid) {
- resultOpt.toIterator
+ resultOpt.iterator
} else {
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
}
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
index cdb8431..cbe2776 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
@@ -78,7 +78,7 @@ private[evaluation] object AreaUnderCurve {
* @param curve an iterator over ordered 2D points stored in pairs
representing a curve
*/
def of(curve: Iterable[(Double, Double)]): Double = {
- curve.toIterator.sliding(2).withPartial(false).aggregate(0.0)(
+ curve.iterator.sliding(2).withPartial(false).aggregate(0.0)(
seqop = (auc: Double, points: Seq[(Double, Double)]) => auc +
trapezoid(points),
combop = _ + _
)
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
index 659f875..30c1a89 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
@@ -67,7 +67,7 @@ private[fpm] class LocalPrefixSpan(
count >= minCount
}.sorted
// project and recursively call genFreqPatterns
- freqItems.toIterator.flatMap { case (item, count) =>
+ freqItems.iterator.flatMap { case (item, count) =>
val newPrefix = prefix :+ item
Iterator.single((newPrefix, count)) ++ {
val projected = postfixes.map(_.project(item)).filter(_.nonEmpty)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index cd71aac3..6f71801 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -220,7 +220,7 @@ object PrefixSpan extends Logging {
data.flatMap { itemsets =>
val uniqItems = mutable.Set.empty[Item]
itemsets.foreach(set => uniqItems ++= set)
- uniqItems.toIterator.map((_, 1L))
+ uniqItems.iterator.map((_, 1L))
}.reduceByKey(_ + _).filter { case (_, count) =>
count >= minCount
}.sortBy(-_._2).map(_._1).collect()
@@ -478,7 +478,7 @@ object PrefixSpan extends Logging {
}
i += 1
}
- prefixes.toIterator
+ prefixes.iterator
}
/** Tests whether this postfix is non-empty. */
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index c8ef71e..9d24ae4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -591,7 +591,7 @@ case class GetViewColumnByNameAndOrdinal(
override def dataType: DataType = throw new UnresolvedException("dataType")
override def nullable: Boolean = throw new UnresolvedException("nullable")
override lazy val resolved = false
- override def stringArgs: Iterator[Any] =
super.stringArgs.toSeq.dropRight(1).toIterator
+ override def stringArgs: Iterator[Any] =
super.stringArgs.toSeq.dropRight(1).iterator
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 68a55f7..4599c2a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -828,7 +828,7 @@ case class MapObjects private(
private def executeFuncOnCollection(inputCollection: Seq[_]): Iterator[_] = {
val row = new GenericInternalRow(1)
- inputCollection.toIterator.map { element =>
+ inputCollection.iterator.map { element =>
row.update(0, element)
lambdaFunction.eval(row)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 6a63118..d08773d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -100,7 +100,7 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord,
wrappedCharException)
}
- }.reduceOption(typeMerger).toIterator
+ }.reduceOption(typeMerger).iterator
}
// Here we manually submit a fold-like Spark job, so that we can set the
SQLConf when running
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
index c347a2e..82aef32 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
@@ -186,7 +186,7 @@ object ObjectSerializerPruning extends Rule[LogicalPlan] {
serializer: NamedExpression,
prunedDataType: DataType): NamedExpression = {
val prunedStructTypes = collectStructType(prunedDataType,
ArrayBuffer.empty[StructType])
- .toIterator
+ .iterator
def transformer: PartialFunction[Expression, Expression] = {
case m: ExternalMapToCatalyst =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index ab7c931..5a9e52a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -57,7 +57,7 @@ class FailureSafeParser[IN](
def parse(input: IN): Iterator[InternalRow] = {
try {
- rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), ()
=> null))
+ rawParser.apply(input).iterator.map(row => toResultRow(Some(row), () =>
null))
} catch {
case e: BadRecordException => mode match {
case PermissiveMode =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
index 812d5de..57fecb7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
@@ -43,7 +43,7 @@ class StringKeyHashMap[T](normalizer: (String) => String) {
def remove(key: String): Option[T] = base.remove(normalizer(key))
- def iterator: Iterator[(String, T)] = base.toIterator
+ def iterator: Iterator[(String, T)] = base.iterator
def clear(): Unit = base.clear()
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index 0d0f7a0..4ad0337 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -44,7 +44,7 @@ object StringUtils extends Logging {
* @return the equivalent Java regular expression of the pattern
*/
def escapeLikeRegex(pattern: String, escapeChar: Char): String = {
- val in = pattern.toIterator
+ val in = pattern.iterator
val out = new StringBuilder()
def fail(message: String) = throw
QueryCompilationErrors.invalidPatternError(pattern, message)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SQLKeywordSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SQLKeywordSuite.scala
index 45f8862..e9d3015 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SQLKeywordSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/SQLKeywordSuite.scala
@@ -54,7 +54,7 @@ trait SQLKeywordUtils extends SparkFunSuite with SQLHelper {
val default = (_: String) => Nil
var startTagFound = false
var parseFinished = false
- val lineIter = sqlSyntaxDefs.toIterator
+ val lineIter = sqlSyntaxDefs.iterator
while (!parseFinished && lineIter.hasNext) {
val line = lineIter.next()
if (line.trim.startsWith(startTag)) {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
index dd67a61..c13cb33 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
@@ -206,7 +206,7 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
if (actualFixedLength !== expectedFixedLength) {
actualFixedLength.grouped(8)
.zip(expectedFixedLength.grouped(8))
- .zip(mergedSchema.fields.toIterator)
+ .zip(mergedSchema.fields.iterator)
.foreach {
case ((actual, expected), field) =>
assert(actual === expected, s"Fixed length sections are not equal
for field $field")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index eed8e03..c21f330 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -83,7 +83,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends
LeafExecNode {
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
- override def executeToIterator(): Iterator[InternalRow] =
sideEffectResult.toIterator
+ override def executeToIterator(): Iterator[InternalRow] =
sideEffectResult.iterator
override def executeTake(limit: Int): Array[InternalRow] =
sideEffectResult.take(limit).toArray
@@ -124,7 +124,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand,
child: SparkPlan)
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
- override def executeToIterator(): Iterator[InternalRow] =
sideEffectResult.toIterator
+ override def executeToIterator(): Iterator[InternalRow] =
sideEffectResult.iterator
override def executeTake(limit: Int): Array[InternalRow] =
sideEffectResult.take(limit).toArray
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 295838e..5e2a8c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -469,7 +469,7 @@ case class AlterTableAddPartitionCommand(
// Also the request to metastore times out when adding lot of partitions
in one shot.
// we should split them into smaller batches
val batchSize = conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
- parts.toIterator.grouped(batchSize).foreach { batch =>
+ parts.iterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists =
ifNotExists)
}
@@ -772,7 +772,7 @@ case class RepairTableCommand(
// we should split them into smaller batches. Since Hive client is not
thread safe, we cannot
// do this in parallel.
val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE)
- partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
+ partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ccef75c..20c393a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -93,7 +93,7 @@ class FileScanRDD(
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}
- private[this] val files =
split.asInstanceOf[FilePartition].files.toIterator
+ private[this] val files =
split.asInstanceOf[FilePartition].files.iterator
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 684bab5..a96f77b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -147,7 +147,7 @@ object OrcUtils extends Logging {
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
- files.toIterator.map(file => readSchema(file.getPath, conf,
ignoreCorruptFiles)).collectFirst {
+ files.iterator.map(file => readSchema(file.getPath, conf,
ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string:
$schema")
toCatalystSchema(schema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
index 5e16022..da4f9e8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
@@ -26,7 +26,7 @@ abstract class FilePartitionReaderFactory extends
PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
- val iter = filePartition.files.toIterator.map { file =>
+ val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildReader(file))
}
new FilePartitionReader[InternalRow](iter)
@@ -35,7 +35,7 @@ abstract class FilePartitionReaderFactory extends
PartitionReaderFactory {
override def createColumnarReader(partition: InputPartition):
PartitionReader[ColumnarBatch] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
- val iter = filePartition.files.toIterator.map { file =>
+ val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildColumnarReader(file))
}
new FilePartitionReader[ColumnarBatch](iter)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala
index fee9137..31e4a77 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala
@@ -48,7 +48,7 @@ abstract class V2CommandExec extends SparkPlan {
*/
override def executeCollect(): Array[InternalRow] = result.toArray
- override def executeToIterator(): Iterator[InternalRow] = result.toIterator
+ override def executeToIterator(): Iterator[InternalRow] = result.iterator
override def executeTake(limit: Int): Array[InternalRow] =
result.take(limit).toArray
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 4de35b9..23b5b61 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -356,7 +356,7 @@ case class BroadcastNestedLoopJoinExec(
i += 1
}
}
- Seq(matched).toIterator
+ Seq(matched).iterator
}
matchedBuildRows.fold(new BitSet(relation.value.length))(_ | _)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index a5ac2d5..e876e9d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1377,7 +1377,7 @@ class ArrowConvertersSuite extends SharedSparkSession {
val schema = StructType(Seq(StructField("int", IntegerType, nullable =
true)))
val ctx = TaskContext.empty()
- val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator,
schema, 5, null, ctx)
+ val batchIter = ArrowConverters.toBatchIterator(inputRows.iterator,
schema, 5, null, ctx)
val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, schema,
null, ctx)
var count = 0
@@ -1398,7 +1398,7 @@ class ArrowConvertersSuite extends SharedSparkSession {
val schema = StructType(Seq(StructField("int", IntegerType, nullable =
true)))
val ctx = TaskContext.empty()
- val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator,
schema, 5, null, ctx)
+ val batchIter = ArrowConverters.toBatchIterator(inputRows.iterator,
schema, 5, null, ctx)
// Write batches to Arrow stream format as a byte array
val out = new ByteArrayOutputStream()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 690623e..08ddc67 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -488,7 +488,7 @@ class FileIndexSuite extends SharedSparkSession {
new Path("file")), Array(new BlockLocation()))
)
when(dfs.listLocatedStatus(path)).thenReturn(new
RemoteIterator[LocatedFileStatus] {
- val iter = statuses.toIterator
+ val iter = statuses.iterator
override def hasNext: Boolean = iter.hasNext
override def next(): LocatedFileStatus = iter.next
})
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index d5b7ed6..2462fe3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -629,7 +629,7 @@ class HashedRelationSuite extends SharedSparkSession {
test("EmptyHashedRelation override methods behavior test") {
val buildKey = Seq(BoundReference(0, LongType, false))
- val hashed = HashedRelation(Seq.empty[InternalRow].toIterator, buildKey,
1, mm)
+ val hashed = HashedRelation(Seq.empty[InternalRow].iterator, buildKey, 1,
mm)
assert(hashed == EmptyHashedRelation)
val key = InternalRow(1L)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 1a5f47b..f6a85c4 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -93,7 +93,7 @@ private[hive] object OrcFileOperator extends Logging {
: Option[StructType] = {
// Take the first file where we can open a valid reader if we can find
one. Otherwise just
// return None to indicate we can't infer the schema.
- paths.toIterator.map(getFileReader(_, conf,
ignoreCorruptFiles)).collectFirst {
+ paths.iterator.map(getFileReader(_, conf,
ignoreCorruptFiles)).collectFirst {
case Some(reader) =>
val readerInspector =
reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index e207dab..3263f12a 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -47,11 +47,11 @@ object RawTextHelper {
i += 1
}
}
- map.toIterator.map {
+ map.iterator.map {
case (k, v) => (k, v)
}
}
- map.toIterator.map{case (k, v) => (k, v)}
+ map.iterator.map{case (k, v) => (k, v)}
}
/**
@@ -89,7 +89,7 @@ object RawTextHelper {
}
}
}
- taken.toIterator
+ taken.iterator
}
/**
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index 0576bf5..dad324b 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -90,7 +90,7 @@ class DStreamClosureSuite extends SparkFunSuite with
LocalStreamingContext with
ds.filter { _ => return; true }
}
private def testMapPartitions(ds: DStream[Int]): Unit =
expectCorrectException {
- ds.mapPartitions { _ => return; Seq.empty.toIterator }
+ ds.mapPartitions { _ => return; Seq.empty.iterator }
}
private def testReduce(ds: DStream[Int]): Unit = expectCorrectException {
ds.reduce { case (_, _) => return; 1 }
@@ -153,7 +153,7 @@ class DStreamClosureSuite extends SparkFunSuite with
LocalStreamingContext with
}
private def testUpdateStateByKey(ds: DStream[(Int, Int)]): Unit = {
val updateF1 = (_: Seq[Int], _: Option[Int]) => { return; Some(1) }
- val updateF2 = (_: Iterator[(Int, Seq[Int], Option[Int])]) => { return;
Seq((1, 1)).toIterator }
+ val updateF2 = (_: Iterator[(Int, Seq[Int], Option[Int])]) => { return;
Seq((1, 1)).iterator }
val updateF3 = (_: Time, _: Int, _: Seq[Int], _: Option[Int]) => {
return
Option(1)
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 03182ae..174c3ca 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -365,7 +365,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
// Setup data queued into the stream
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val inputIterator = input.toIterator
+ val inputIterator = input.iterator
for (i <- input.indices) {
// Enqueue more than 1 item per tick but they should dequeue one at a
time
inputIterator.take(2).foreach { i =>
@@ -411,7 +411,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
// Enqueue the first 3 items (one by one), they should be merged in the
next batch
- val inputIterator = input.toIterator
+ val inputIterator = input.iterator
inputIterator.take(3).foreach { i =>
queue.synchronized {
queue += ssc.sparkContext.makeRDD(Seq(i))
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 3bcea1a..a3b5b38 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -363,7 +363,7 @@ abstract class
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
val blocks = data.grouped(10).toSeq
- storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) })
+ storeAndVerify(blocks.map { b => IteratorBlock(b.iterator) })
storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) })
storeAndVerify(blocks.map { b =>
ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
}
@@ -372,7 +372,7 @@ abstract class
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler):
Unit = {
// Handle error in iterator (e.g. divide-by-zero error)
intercept[Exception] {
- val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+ val iterator = (10 to (-10, -1)).iterator.map { _ / 0 }
receivedBlockHandler.storeBlock(StreamBlockId(1, 1),
IteratorBlock(iterator))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]