This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fba68e6f20b [SPARK-38896][CORE][SQL] Use `tryWithResource` release `LevelDB/RocksDBIterator` resources earlier fba68e6f20b is described below commit fba68e6f20b88779dc5ca78742958952b3d7acf0 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Thu Apr 28 18:51:28 2022 -0500 [SPARK-38896][CORE][SQL] Use `tryWithResource` release `LevelDB/RocksDBIterator` resources earlier ### What changes were proposed in this pull request? Similar to SPARK-38847, this pr aims to release the `LevelDB/RocksDBIterator` resources earlier by using `tryWithResource`. The main change of this pr as follows: 1. Use Java `tryWithResource` and Spark `Utils.tryWithResource` to recycling `KVStoreIterator` opened by `RocksDB.view(Class<T> type).iterator` and `RocksDB.view(Class<T> type).iterator` 2. Introduce 4 new function for KVUtils(`count|foreach|mapToSeq|size`), these function will close `KVStoreIterator` in time. ### Why are the changes needed? Release the `LevelDB/RocksDBIterator` resources earlier ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #36237 from LuciferYang/Manual-Close-KVStoreIterator. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../org/apache/spark/util/kvstore/LevelDB.java | 12 +++-- .../org/apache/spark/util/kvstore/RocksDB.java | 12 +++-- .../apache/spark/util/kvstore/DBIteratorSuite.java | 6 ++- .../spark/util/kvstore/LevelDBBenchmark.java | 12 +++-- .../apache/spark/util/kvstore/LevelDBSuite.java | 29 ++++++----- .../spark/util/kvstore/RocksDBBenchmark.java | 13 +++-- .../apache/spark/util/kvstore/RocksDBSuite.java | 29 ++++++----- .../spark/deploy/history/FsHistoryProvider.scala | 15 ++---- .../apache/spark/status/AppStatusListener.scala | 2 +- .../org/apache/spark/status/AppStatusStore.scala | 58 ++++++++++++---------- .../scala/org/apache/spark/status/KVUtils.scala | 37 ++++++++++++++ .../spark/deploy/history/HybridStoreSuite.scala | 18 +++++-- .../spark/status/AppStatusListenerSuite.scala | 3 +- .../spark/sql/diagnostic/DiagnosticStore.scala | 10 ++-- .../ui/HiveThriftServer2AppStatusStore.scala | 8 +-- .../ui/HiveThriftServer2Listener.scala | 3 +- 16 files changed, 170 insertions(+), 97 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 6b28373a480..b50906e2cba 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -270,10 +270,14 @@ public class LevelDB implements KVStore { KVStoreView<T> view = view(klass).index(index); for (Object indexValue : indexValues) { - for (T value: view.first(indexValue).last(indexValue)) { - Object itemKey = naturalIndex.getValue(value); - delete(klass, itemKey); - removed = true; + try (KVStoreIterator<T> iterator = + view.first(indexValue).last(indexValue).closeableIterator()) { + while (iterator.hasNext()) { + T value = iterator.next(); + Object itemKey = naturalIndex.getValue(value); + delete(klass, itemKey); + removed = true; + } } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java index 7674bc52dc7..d328e5c79d3 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java @@ -303,10 +303,14 @@ public class RocksDB implements KVStore { KVStoreView<T> view = view(klass).index(index); for (Object indexValue : indexValues) { - for (T value: view.first(indexValue).last(indexValue)) { - Object itemKey = naturalIndex.getValue(value); - delete(klass, itemKey); - removed = true; + try (KVStoreIterator<T> iterator = + view.first(indexValue).last(indexValue).closeableIterator()) { + while (iterator.hasNext()) { + T value = iterator.next(); + Object itemKey = naturalIndex.getValue(value); + delete(klass, itemKey); + removed = true; + } } } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java index ab1e2728585..223f3f93a87 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java @@ -490,11 +490,15 @@ public abstract class DBIteratorSuite { } private KVStoreView<CustomType1> view() throws Exception { + // SPARK-38896: this `view` will be closed in + // the `collect(KVStoreView<CustomType1> view)` method. return db.view(CustomType1.class); } private List<CustomType1> collect(KVStoreView<CustomType1> view) throws Exception { - return Arrays.asList(Iterables.toArray(view, CustomType1.class)); + try (KVStoreIterator<CustomType1> iterator = view.closeableIterator()) { + return Lists.newArrayList(iterator); + } } private List<CustomType1> sortBy(Comparator<CustomType1> comp) { diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java index f2a91f916a3..9082e1887bf 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java @@ -197,9 +197,15 @@ public class LevelDBBenchmark { } } - while (it.hasNext()) { - try(Timer.Context ctx = iter.time()) { - it.next(); + try { + while (it.hasNext()) { + try (Timer.Context ctx = iter.time()) { + it.next(); + } + } + } finally { + if (it != null) { + it.close(); } } } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index a7a2148c02d..ef0ccd4a639 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Spliterators; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -236,13 +237,14 @@ public class LevelDBSuite { db.write(createCustomType1(i)); } - KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator(); - assertTrue(it.hasNext()); - assertTrue(it.skip(5)); - assertEquals("key5", it.next().key); - assertTrue(it.skip(3)); - assertEquals("key9", it.next().key); - assertFalse(it.hasNext()); + try (KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator()) { + assertTrue(it.hasNext()); + assertTrue(it.skip(5)); + assertEquals("key5", it.next().key); + assertTrue(it.skip(3)); + assertEquals("key9", it.next().key); + assertFalse(it.hasNext()); + } } @Test @@ -257,12 +259,15 @@ public class LevelDBSuite { } }); - List<Integer> results = StreamSupport - .stream(db.view(CustomType1.class).index("int").spliterator(), false) - .map(e -> e.num) - .collect(Collectors.toList()); + try (KVStoreIterator<CustomType1> iterator = + db.view(CustomType1.class).index("int").closeableIterator()) { + List<Integer> results = StreamSupport + .stream(Spliterators.spliteratorUnknownSize(iterator, 0), false) + .map(e -> e.num) + .collect(Collectors.toList()); - assertEquals(expected, results); + assertEquals(expected, results); + } } @Test diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java index 4517a47b32f..25930bb1013 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java @@ -196,10 +196,15 @@ public class RocksDBBenchmark { } } } - - while (it.hasNext()) { - try(Timer.Context ctx = iter.time()) { - it.next(); + try { + while (it.hasNext()) { + try (Timer.Context ctx = iter.time()) { + it.next(); + } + } + } finally { + if (it != null) { + it.close(); } } } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java index 8112cbf04b3..a3ac40efdfb 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Spliterators; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -234,13 +235,14 @@ public class RocksDBSuite { db.write(createCustomType1(i)); } - KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator(); - assertTrue(it.hasNext()); - assertTrue(it.skip(5)); - assertEquals("key5", it.next().key); - assertTrue(it.skip(3)); - assertEquals("key9", it.next().key); - assertFalse(it.hasNext()); + try (KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator()) { + assertTrue(it.hasNext()); + assertTrue(it.skip(5)); + assertEquals("key5", it.next().key); + assertTrue(it.skip(3)); + assertEquals("key9", it.next().key); + assertFalse(it.hasNext()); + } } @Test @@ -255,12 +257,15 @@ public class RocksDBSuite { } }); - List<Integer> results = StreamSupport - .stream(db.view(CustomType1.class).index("int").spliterator(), false) - .map(e -> e.num) - .collect(Collectors.toList()); + try (KVStoreIterator<CustomType1> iterator = + db.view(CustomType1.class).index("int").closeableIterator()) { + List<Integer> results = StreamSupport + .stream(Spliterators.spliteratorUnknownSize(iterator, 0), false) + .map(e -> e.num) + .collect(Collectors.toList()); - assertEquals(expected, results); + assertEquals(expected, results); + } } @Test diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index dddb7da617f..e1b4104eb77 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -325,12 +325,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getListing(): Iterator[ApplicationInfo] = { // Return the listing in end time descending order. - listing.view(classOf[ApplicationInfoWrapper]) - .index("endTime") - .reverse() - .iterator() - .asScala - .map(_.toApplicationInfo()) + KVUtils.mapToSeq(listing.view(classOf[ApplicationInfoWrapper]) + .index("endTime").reverse())(_.toApplicationInfo()).iterator } override def getApplicationInfo(appId: String): Option[ApplicationInfo] = { @@ -982,14 +978,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // If the number of files is bigger than MAX_LOG_NUM, // clean up all completed attempts per application one by one. - val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size + val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed")) var count = num - maxNum if (count > 0) { logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.") - val oldAttempts = listing.view(classOf[ApplicationInfoWrapper]) - .index("oldestAttempt") - .asScala - oldAttempts.foreach { app => + KVUtils.foreach(listing.view(classOf[ApplicationInfoWrapper]).index("oldestAttempt")) { app => if (count > 0) { // Applications may have multiple attempts, some of which may not be completed yet. val (toDelete, remaining) = app.attempts.partition(_.info.completed) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 06008988947..add9862c306 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1276,7 +1276,7 @@ private[spark] class AppStatusListener( private def cleanupStagesWithInMemoryStore(countToDelete: Long): Seq[Array[Int]] = { val stageArray = new ArrayBuffer[StageCompletionTime]() val stageDataCount = new mutable.HashMap[Int, Int]() - kvstore.view(classOf[StageDataWrapper]).forEach { s => + KVUtils.foreach(kvstore.view(classOf[StageDataWrapper])) { s => // Here we keep track of the total number of StageDataWrapper entries for each stage id. // This will be used in cleaning up the RDDOperationGraphWrapper data. if (stageDataCount.contains(s.info.stageId)) { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index b455850d609..4c0ac5e3192 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -65,15 +65,15 @@ private[spark] class AppStatusStore( } def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = { - store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq + KVUtils.mapToSeq(store.view(classOf[ResourceProfileWrapper]))(_.rpInfo) } def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { - val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info) + val it = KVUtils.mapToSeq(store.view(classOf[JobDataWrapper]).reverse())(_.info) if (statuses != null && !statuses.isEmpty()) { - it.filter { job => statuses.contains(job.status) }.toSeq + it.filter { job => statuses.contains(job.status) } } else { - it.toSeq + it } } @@ -95,9 +95,9 @@ private[spark] class AppStatusStore( } else { base } - filtered.asScala.map(_.info) + KVUtils.mapToSeq(filtered)(_.info) .filter(_.id != FALLBACK_BLOCK_MANAGER_ID.executorId) - .map(replaceExec).toSeq + .map(replaceExec) } private def replaceExec(origin: v1.ExecutorSummary): v1.ExecutorSummary = { @@ -155,7 +155,7 @@ private[spark] class AppStatusStore( } else { base } - filtered.asScala.map(_.info).toSeq + KVUtils.mapToSeq(filtered)(_.info) } def executorSummary(executorId: String): v1.ExecutorSummary = { @@ -177,12 +177,13 @@ private[spark] class AppStatusStore( unsortedQuantiles: Array[Double] = Array.empty, taskStatus: JList[v1.TaskStatus] = List().asJava): Seq[v1.StageData] = { val quantiles = unsortedQuantiles.sorted - val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info) + val it = KVUtils.mapToSeq(store.view(classOf[StageDataWrapper]).reverse())(_.info) val ret = if (statuses != null && !statuses.isEmpty()) { - it.filter { s => statuses.contains(s.status) }.toSeq + it.filter { s => statuses.contains(s.status) } } else { - it.toSeq + it } + ret.map { s => newStageData(s, withDetail = details, taskStatus = taskStatus, withSummaries = withSummaries, unsortedQuantiles = quantiles) @@ -195,11 +196,11 @@ private[spark] class AppStatusStore( taskStatus: JList[v1.TaskStatus] = List().asJava, withSummaries: Boolean = false, unsortedQuantiles: Array[Double] = Array.empty[Double]): Seq[v1.StageData] = { - store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId) - .asScala.map { s => - newStageData(s.info, withDetail = details, taskStatus = taskStatus, - withSummaries = withSummaries, unsortedQuantiles = unsortedQuantiles) - }.toSeq + KVUtils.mapToSeq(store.view(classOf[StageDataWrapper]).index("stageId") + .first(stageId).last(stageId)) { s => + newStageData(s.info, withDetail = details, taskStatus = taskStatus, + withSummaries = withSummaries, unsortedQuantiles = unsortedQuantiles) + } } def lastStageAttempt(stageId: Int): v1.StageData = { @@ -468,9 +469,9 @@ private[spark] class AppStatusStore( def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = { val stageKey = Array(stageId, stageAttemptId) - val taskDataWrapperIter = store.view(classOf[TaskDataWrapper]).index("stage") - .first(stageKey).last(stageKey).reverse().max(maxTasks).asScala - constructTaskDataList(taskDataWrapperIter).reverse + val taskDataWrapperSeq = KVUtils.viewToSeq(store.view(classOf[TaskDataWrapper]).index("stage") + .first(stageKey).last(stageKey).reverse().max(maxTasks)) + constructTaskDataList(taskDataWrapperSeq).reverse } def taskList( @@ -511,20 +512,22 @@ private[spark] class AppStatusStore( } val ordered = if (ascending) indexed else indexed.reverse() - val taskDataWrapperIter = if (statuses != null && !statuses.isEmpty) { + val taskDataWrapperSeq = if (statuses != null && !statuses.isEmpty) { val statusesStr = statuses.asScala.map(_.toString).toSet - ordered.asScala.filter(s => statusesStr.contains(s.status)).slice(offset, offset + length) + KVUtils.viewToSeq(ordered, offset, offset + length)(s => statusesStr.contains(s.status)) } else { - ordered.skip(offset).max(length).asScala + KVUtils.viewToSeq(ordered.skip(offset).max(length)) } - constructTaskDataList(taskDataWrapperIter) + constructTaskDataList(taskDataWrapperSeq) } def executorSummary(stageId: Int, attemptId: Int): Map[String, v1.ExecutorStageSummary] = { val stageKey = Array(stageId, attemptId) - store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stageKey).last(stageKey) - .asScala.map { exec => (exec.executorId -> exec.info) }.toMap + KVUtils.mapToSeq(store.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage").first(stageKey).last(stageKey)) { exec => + (exec.executorId -> exec.info) + }.toMap } def speculationSummary(stageId: Int, attemptId: Int): Option[v1.SpeculationStageSummary] = { @@ -533,9 +536,10 @@ private[spark] class AppStatusStore( } def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = { - store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).filter { rdd => - !cachedOnly || rdd.numCachedPartitions > 0 - }.toSeq + KVUtils.mapToSeq(store.view(classOf[RDDStorageInfoWrapper]))(_.info) + .filter { rdd => + !cachedOnly || rdd.numCachedPartitions > 0 + } } /** diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index e422bf3c05a..f21f10004b2 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -93,6 +93,16 @@ private[spark] object KVUtils extends Logging { } } + /** Turns an interval of KVStoreView into a Scala sequence, applying a filter. */ + def viewToSeq[T]( + view: KVStoreView[T], + from: Int, + until: Int)(filter: T => Boolean): Seq[T] = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.filter(filter).slice(from, until).toList + } + } + /** Turns a KVStoreView into a Scala sequence. */ def viewToSeq[T](view: KVStoreView[T]): Seq[T] = { Utils.tryWithResource(view.closeableIterator()) { iter => @@ -100,6 +110,33 @@ private[spark] object KVUtils extends Logging { } } + /** Counts the number of elements in the KVStoreView which satisfy a predicate. */ + def count[T](view: KVStoreView[T])(countFunc: T => Boolean): Int = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.count(countFunc) + } + } + + /** Applies a function f to all values produced by KVStoreView. */ + def foreach[T](view: KVStoreView[T])(foreachFunc: T => Unit): Unit = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.foreach(foreachFunc) + } + } + + /** Maps all values of KVStoreView to new values using a transformation function. */ + def mapToSeq[T, B](view: KVStoreView[T])(mapFunc: T => B): Seq[B] = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.map(mapFunc).toList + } + } + + def size[T](view: KVStoreView[T]): Int = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.size + } + } + private[spark] class MetadataMismatchException extends Exception } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala index 9c06c99e8e6..555759b88e1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.status.KVUtils._ import org.apache.spark.tags.ExtendedLevelDBTest +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore._ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits { @@ -103,6 +104,13 @@ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with T } test("test basic iteration") { + + def head[T](view: KVStoreView[T]): T = { + Utils.tryWithResource(view.closeableIterator()) { iter => + assert(iter.hasNext) + iter.next() + } + } val store = createHybridStore() val t1 = createCustomType1(1) @@ -113,11 +121,11 @@ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with T Seq(false, true).foreach { switch => if (switch) switchHybridStore(store) - assert(store.view(t1.getClass()).iterator().next().id === t1.id) - assert(store.view(t1.getClass()).skip(1).iterator().next().id === t2.id) - assert(store.view(t1.getClass()).skip(1).max(1).iterator().next().id === t2.id) - assert(store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id === t1.id) - assert(store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id === t2.id) + assert(head(store.view(t1.getClass)).id === t1.id) + assert(head(store.view(t1.getClass()).skip(1)).id === t2.id) + assert(head(store.view(t1.getClass()).skip(1).max(1)).id === t2.id) + assert(head(store.view(t1.getClass()).first(t1.key).max(1)).id === t1.id) + assert(head(store.view(t1.getClass()).first(t2.key).max(1)).id === t2.id) } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index f5ccce7f1d9..ec92877ce94 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.status import java.io.File import java.util.{Date, Properties} -import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.reflect.{classTag, ClassTag} @@ -1671,7 +1670,7 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter } // check peak executor metric values for each stage and executor - val stageExecSummaries = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.toSeq + val stageExecSummaries = KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper])) stageExecSummaries.foreach { exec => expectedStageValues.get(exec.stageId) match { case Some(stageValue) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala index 236ee104f0e..c13cc8a7f39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.diagnostic -import scala.collection.JavaConverters._ - import com.fasterxml.jackson.annotation.JsonIgnore +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.util.kvstore.{KVIndex, KVStore} @@ -32,7 +31,7 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore} class DiagnosticStore(store: KVStore) { def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = { - store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq + KVUtils.viewToSeq(store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length)) } def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = { @@ -44,11 +43,10 @@ class DiagnosticStore(store: KVStore) { } def adaptiveExecutionUpdates(executionId: Long): Seq[AdaptiveExecutionUpdate] = { + KVUtils.viewToSeq( store.view(classOf[AdaptiveExecutionUpdate]) .index("updateTime") - .parent(executionId) - .asScala - .toSeq + .parent(executionId)) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala index 92c7feaf646..58520854936 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.hive.thriftserver.ui -import com.fasterxml.jackson.annotation.JsonIgnore -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import com.fasterxml.jackson.annotation.JsonIgnore + import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVIndexParam @@ -41,7 +41,7 @@ class HiveThriftServer2AppStatusStore(store: KVStore) { } def getOnlineSessionNum: Int = { - store.view(classOf[SessionInfo]).asScala.count(_.finishTimestamp == 0) + KVUtils.count(store.view(classOf[SessionInfo]))(_.finishTimestamp == 0) } def getSession(sessionId: String): Option[SessionInfo] = { @@ -66,7 +66,7 @@ class HiveThriftServer2AppStatusStore(store: KVStore) { * cancellations and count all statements that have not been closed so far. */ def getTotalRunning: Int = { - store.view(classOf[ExecutionInfo]).asScala.count(_.isExecutionActive) + KVUtils.count(store.view(classOf[ExecutionInfo]))(_.isExecutionActive) } def getSessionCount: Long = { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 7b2da6970fb..5ccc72c7782 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -101,7 +101,8 @@ private[thriftserver] class HiveThriftServer2Listener( // Execution end event (Refer SPARK-27019). To handle that situation, if occurs in // Thriftserver, following code will take care. Here will come only if JobStart event comes // after Execution End event. - val storeExecInfo = kvstore.view(classOf[ExecutionInfo]).asScala.filter(_.groupId == groupId) + val storeExecInfo = KVUtils.viewToSeq( + kvstore.view(classOf[ExecutionInfo]), Int.MaxValue)(_.groupId == groupId) storeExecInfo.foreach { exec => val liveExec = getOrCreateExecution(exec.execId, exec.statement, exec.sessionId, exec.startTimestamp, exec.userName) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org