This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 61aa65d9f89 [SPARK-38847][CORE][SQL][SS] Introduce a `viewToSeq`
function for `KVUtils` to close `KVStoreIterator` in time
61aa65d9f89 is described below
commit 61aa65d9f897f29813cbbc77b6d0dbad770c8954
Author: yangjie01 <[email protected]>
AuthorDate: Tue Apr 12 09:25:03 2022 -0500
[SPARK-38847][CORE][SQL][SS] Introduce a `viewToSeq` function for `KVUtils`
to close `KVStoreIterator` in time
### What changes were proposed in this pull request?
There are many codes in spark that convert `KVStoreView` into scala `List`,
and these codes will not close `KVStoreIterator`, these resources are mainly
recycled by `finalize()` method implemented in `LevelDB` and `RockSB`, this
makes `KVStoreIterator` resource recycling unpredictable.
This pr introduce a `viewToSeq` function for `KVUtils`, this function will
convert all data in the `KVStoreView` into scala `List` and close
`KVStoreIterator` in time.
### Why are the changes needed?
Add a general function to convert `KVStoreView` into scala `List` and close
`KVStoreIterator` in time.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA
Closes #36132 from LuciferYang/kvutils-viewToSeq.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/deploy/history/FsHistoryProvider.scala | 26 +++++++---------------
.../deploy/history/HistoryServerDiskManager.scala | 8 +++----
.../apache/spark/status/AppStatusListener.scala | 10 ++++-----
.../org/apache/spark/status/AppStatusStore.scala | 2 +-
.../scala/org/apache/spark/status/KVUtils.scala | 8 +++++++
.../deploy/history/FsHistoryProviderSuite.scala | 10 ++++-----
.../spark/status/AppStatusListenerSuite.scala | 23 +++++++++----------
.../spark/sql/execution/ui/SQLAppStatusStore.scala | 6 ++---
.../execution/ui/StreamingQueryStatusStore.scala | 6 ++---
.../ui/StreamingQueryStatusListener.scala | 2 +-
.../ui/HiveThriftServer2AppStatusStore.scala | 5 +++--
11 files changed, 50 insertions(+), 56 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a9adaed374a..dddb7da617f 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -592,11 +592,9 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// Only entries with valid applications are cleaned up here. Cleaning up
invalid log
// files is done by the periodic cleaner task.
val stale = listing.synchronized {
- listing.view(classOf[LogInfo])
+ KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
- .last(newLastScanTime - 1)
- .asScala
- .toList
+ .last(newLastScanTime - 1))
}
stale.filterNot(isProcessing)
.filterNot(info => notStale.contains(info.logPath))
@@ -957,12 +955,10 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
val maxNum = conf.get(MAX_LOG_NUM)
- val expired = listing.view(classOf[ApplicationInfoWrapper])
+ val expired =
KVUtils.viewToSeq(listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
.reverse()
- .first(maxTime)
- .asScala
- .toList
+ .first(maxTime))
expired.foreach { app =>
// Applications may have multiple attempts, some of which may not need
to be deleted yet.
val (remaining, toDelete) = app.attempts.partition { attempt =>
@@ -972,13 +968,10 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
// Delete log files that don't have a valid application and exceed the
configured max age.
- val stale = listing.view(classOf[LogInfo])
+ val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.reverse()
- .first(maxTime)
- .asScala
- .filter { l => l.logType == null || l.logType == LogType.EventLogs }
- .toList
+ .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType ==
LogType.EventLogs }
stale.filterNot(isProcessing).foreach { log =>
if (log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
@@ -1080,13 +1073,10 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
// Delete driver log file entries that exceed the configured max age and
// may have been deleted on filesystem externally.
- val stale = listing.view(classOf[LogInfo])
+ val stale = KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.reverse()
- .first(maxTime)
- .asScala
- .filter { l => l.logType != null && l.logType == LogType.DriverLogs }
- .toList
+ .first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType ==
LogType.DriverLogs }
stale.filterNot(isProcessing).foreach { log =>
logInfo(s"Deleting invalid driver log ${log.logPath}")
listing.delete(classOf[LogInfo], log.logPath)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 72d407d8643..6759d890d0d 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy.history
import java.io.File
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.commons.io.FileUtils
@@ -29,6 +28,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.History.HybridStoreDiskBackend.LEVELDB
+import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils._
import org.apache.spark.util.{Clock, Utils}
import org.apache.spark.util.kvstore.KVStore
@@ -78,10 +78,8 @@ private class HistoryServerDiskManager(
// Go through the recorded store directories and remove any that may have
been removed by
// external code.
- val (existences, orphans) = listing
- .view(classOf[ApplicationStoreInfo])
- .asScala
- .toSeq
+ val (existences, orphans) = KVUtils.viewToSeq(listing
+ .view(classOf[ApplicationStoreInfo]))
.partition { info =>
new File(info.path).exists()
}
diff --git
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 35c43b06c28..06008988947 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -1249,8 +1249,8 @@ private[spark] class AppStatusListener(
if (dead > threshold) {
val countToDelete = calculateNumberToRemove(dead, threshold)
- val toDelete =
kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
- .max(countToDelete).first(false).last(false).asScala.toSeq
+ val toDelete =
KVUtils.viewToSeq(kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
+ .max(countToDelete).first(false).last(false))
toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
}
}
@@ -1406,12 +1406,10 @@ private[spark] class AppStatusListener(
}
private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = {
- val cachedQuantiles = kvstore.view(classOf[CachedQuantile])
+ val cachedQuantiles =
KVUtils.viewToSeq(kvstore.view(classOf[CachedQuantile])
.index("stage")
.first(stageKey)
- .last(stageKey)
- .asScala
- .toList
+ .last(stageKey))
cachedQuantiles.foreach { q =>
kvstore.delete(q.getClass(), q.id)
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 03767ee83a9..34155e3e330 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -691,7 +691,7 @@ private[spark] class AppStatusStore(
}
def streamBlocksList(): Seq[StreamBlockData] = {
- store.view(classOf[StreamBlockData]).asScala.toSeq
+ KVUtils.viewToSeq(store.view(classOf[StreamBlockData]))
}
def operationGraphForStage(stageId: Int): RDDOperationGraph = {
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index 7a4b613ac06..e422bf3c05a 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
+import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore._
private[spark] object KVUtils extends Logging {
@@ -92,6 +93,13 @@ private[spark] object KVUtils extends Logging {
}
}
+ /** Turns a KVStoreView into a Scala sequence. */
+ def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
+ Utils.tryWithResource(view.closeableIterator()) { iter =>
+ iter.asScala.toList
+ }
+ }
+
private[spark] class MetadataMismatchException extends Exception
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index b05b9de68dc..398a220ebd1 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -23,7 +23,6 @@ import java.util.{Date, Locale}
import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
-import scala.collection.JavaConverters._
import scala.concurrent.duration._
import com.google.common.io.{ByteStreams, Files}
@@ -50,6 +49,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.security.GroupMappingServiceProvider
import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils.KVStoreScalaSerializer
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
@@ -684,12 +684,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
log3.setLastModified(clock.getTimeMillis())
// This should not trigger any cleanup
provider.cleanDriverLogs()
- provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size
should be(3)
+ KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should
be(3)
// Should trigger cleanup for first file but not second one
clock.setTime(firstFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) +
1)
provider.cleanDriverLogs()
- provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size
should be(2)
+ KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should
be(2)
assert(!log1.exists())
assert(log2.exists())
assert(log3.exists())
@@ -700,7 +700,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
// Should cleanup the second file but not the third file, as filelength
changed.
clock.setTime(secondFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) +
1)
provider.cleanDriverLogs()
- provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size
should be(1)
+ KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should
be(1)
assert(!log1.exists())
assert(!log2.exists())
assert(log3.exists())
@@ -708,7 +708,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
// Should cleanup the third file as well.
clock.setTime(secondFileModifiedTime + 2 *
TimeUnit.SECONDS.toMillis(maxAge) + 2)
provider.cleanDriverLogs()
- provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size
should be(0)
+ KVUtils.viewToSeq(provider.listing.view(classOf[LogInfo])).size should
be(0)
assert(!log3.exists())
}
diff --git
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 105b447f9c7..f5ccce7f1d9 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -254,8 +254,8 @@ abstract class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter
assert(stage.info.memoryBytesSpilled === s1Tasks.size * value)
}
- val execs =
store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
- .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
+ val execs =
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+ .first(key(stages.head)).last(key(stages.head)))
assert(execs.size > 0)
execs.foreach { exec =>
assert(exec.info.memoryBytesSpilled === s1Tasks.size * value / 2)
@@ -272,10 +272,9 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
stageAttemptId = stages.head.attemptNumber))
val executorStageSummaryWrappers =
- store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
.first(key(stages.head))
- .last(key(stages.head))
- .asScala.toSeq
+ .last(key(stages.head)))
assert(executorStageSummaryWrappers.nonEmpty)
executorStageSummaryWrappers.foreach { exec =>
@@ -301,10 +300,9 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
stageAttemptId = stages.head.attemptNumber))
val executorStageSummaryWrappersForNode =
- store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
.first(key(stages.head))
- .last(key(stages.head))
- .asScala.toSeq
+ .last(key(stages.head)))
assert(executorStageSummaryWrappersForNode.nonEmpty)
executorStageSummaryWrappersForNode.foreach { exec =>
@@ -1364,13 +1362,13 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
TaskKilled(reason = "Killed"), tasks(1), new ExecutorMetrics, null))
// Ensure killed task metrics are updated
- val allStages =
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
+ val allStages =
KVUtils.viewToSeq(store.view(classOf[StageDataWrapper]).reverse()).map(_.info)
val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
assert(failedStages.size == 1)
assert(failedStages.head.numKilledTasks == 1)
assert(failedStages.head.numCompleteTasks == 1)
- val allJobs =
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
+ val allJobs =
KVUtils.viewToSeq(store.view(classOf[JobDataWrapper]).reverse()).map(_.info)
assert(allJobs.size == 1)
assert(allJobs.head.numKilledTasks == 1)
assert(allJobs.head.numCompletedTasks == 1)
@@ -1427,14 +1425,15 @@ abstract class AppStatusListenerSuite extends
SparkFunSuite with BeforeAndAfter
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), new
ExecutorMetrics,
null))
- val esummary =
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+ val esummary =
KVUtils.viewToSeq(store.view(classOf[ExecutorStageSummaryWrapper])).map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
}
- val allExecutorSummary =
store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
+ val allExecutorSummary =
+
KVUtils.viewToSeq(store.view(classOf[ExecutorSummaryWrapper])).map(_.info)
assert(allExecutorSummary.size === 2)
allExecutorSummary.foreach { allExecSummary =>
assert(allExecSummary.failedTasks === 1)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 7c3315e3d76..95035c08a2c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.ui
import java.lang.{Long => JLong}
import java.util.Date
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.kvstore.{KVIndex, KVStore}
@@ -39,11 +39,11 @@ class SQLAppStatusStore(
val listener: Option[SQLAppStatusListener] = None) {
def executionsList(): Seq[SQLExecutionUIData] = {
- store.view(classOf[SQLExecutionUIData]).asScala.toSeq
+ KVUtils.viewToSeq(store.view(classOf[SQLExecutionUIData]))
}
def executionsList(offset: Int, length: Int): Seq[SQLExecutionUIData] = {
-
store.view(classOf[SQLExecutionUIData]).skip(offset).max(length).asScala.toSeq
+
KVUtils.viewToSeq(store.view(classOf[SQLExecutionUIData]).skip(offset).max(length))
}
def execution(executionId: Long): Option[SQLExecutionUIData] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
index 6a3b4eeb672..b6d27008b85 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
@@ -32,21 +32,21 @@ class StreamingQueryStatusStore(store: KVStore) {
def allQueryUIData: Seq[StreamingQueryUIData] = {
val view =
store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L)
- KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData)
+ KVUtils.viewToSeq(view).map(makeUIData)
}
// visible for test
private[sql] def getQueryProgressData(runId: UUID):
Seq[StreamingQueryProgressWrapper] = {
val view = store.view(classOf[StreamingQueryProgressWrapper])
.index("runId").first(runId.toString).last(runId.toString)
- KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ KVUtils.viewToSeq(view)
}
private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = {
val runId = summary.runId
val view = store.view(classOf[StreamingQueryProgressWrapper])
.index("runId").first(runId).last(runId)
- val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val recentProgress = KVUtils.viewToSeq(view)
.map(_.progress).sortBy(_.timestamp).toArray
StreamingQueryUIData(summary, recentProgress)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 55ceab245a9..2e6102b01fa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -55,7 +55,7 @@ private[sql] class StreamingQueryStatusListener(
private def cleanupInactiveQueries(count: Long): Unit = {
val view =
store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
- val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val inactiveQueries = KVUtils.viewToSeq(view)
val numInactiveQueries = inactiveQueries.size
if (numInactiveQueries <= inactiveQueryStatusRetention) {
return
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
index 54809fe6c80..92c7feaf646 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
+import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.kvstore.{KVIndex, KVStore}
@@ -32,11 +33,11 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore}
class HiveThriftServer2AppStatusStore(store: KVStore) {
def getSessionList: Seq[SessionInfo] = {
- store.view(classOf[SessionInfo]).asScala.toSeq
+ KVUtils.viewToSeq(store.view(classOf[SessionInfo]))
}
def getExecutionList: Seq[ExecutionInfo] = {
- store.view(classOf[ExecutionInfo]).asScala.toSeq
+ KVUtils.viewToSeq(store.view(classOf[ExecutionInfo]))
}
def getOnlineSessionNum: Int = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]