This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1d80d80a56c4 [SPARK-46249][SS] Require instance lock for acquiring
RocksDB metrics to prevent race with background operations
1d80d80a56c4 is described below
commit 1d80d80a56c418f841e282ad753fad6671c3baae
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Dec 5 15:00:08 2023 +0900
[SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to
prevent race with background operations
### What changes were proposed in this pull request?
Require instance lock for acquiring RocksDB metrics to prevent race with
background operations
### Why are the changes needed?
The changes are needed to avoid races where the statefulOperator tries to
set storeMetrics after the commit and the DB instance has already been
closed/aborted/reloaded.
We have seen a few query failures with the following stack trace due to
this reason:
```
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted
due to stage failure: Task 3 in stage 531.0 failed 1 times, most recent
failure: Lost task 3.0 in stage 531.0 (TID 1544)
(ip-10-110-29-251.us-west-2.compute.internal executor driver):
java.lang.NullPointerException
at
org.apache.spark.sql.execution.streaming.state.RocksDB.getDBProperty(RocksDB.scala:838)
at
org.apache.spark.sql.execution.streaming.state.RocksDB.metrics(RocksDB.scala:678)
at
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.metrics(RocksDBStateStoreProvider.scala:137)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics(statefulOperators.scala:198)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics$(statefulOperators.scala:197)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.setStoreMetrics(statefulOperators.scala:495)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.close(statefulOperators.scala:626)
at
org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:498)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at
com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified existing unit tests
```
[info] Run completed in 1 minute, 31 seconds.
[info] Total number of tests run: 150
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 150, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44165 from anishshri-db/task/SPARK-46249.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 30 +++++-
.../state/RocksDBStateStoreProvider.scala | 103 ++++++++++++---------
.../streaming/state/RocksDBStateStoreSuite.scala | 24 +++--
.../execution/streaming/state/RocksDBSuite.scala | 36 +++----
4 files changed, 112 insertions(+), 81 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 3a42f9e2ccb5..c33a7c472842 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -137,6 +137,11 @@ class RocksDB(
@volatile private var numKeysOnWritingVersion = 0L
@volatile private var fileManagerMetrics =
RocksDBFileManagerMetrics.EMPTY_METRICS
+ // SPARK-46249 - Keep track of recorded metrics per version which can be
used for querying later
+ // Updates and access to recordedMetrics are protected by the DB instance
lock
+ @GuardedBy("acquireLock")
+ @volatile private var recordedMetrics: Option[RocksDBMetrics] = None
+
@GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
@@ -148,6 +153,7 @@ class RocksDB(
def load(version: Long, readOnly: Boolean = false): RocksDB = {
assert(version >= 0)
acquire()
+ recordedMetrics = None
logInfo(s"Loading $version")
try {
if (loadedVersion != version) {
@@ -397,7 +403,8 @@ class RocksDB(
"checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs
)
- logInfo(s"Committed $newVersion, stats = ${metrics.json}")
+ recordedMetrics = Some(metrics)
+ logInfo(s"Committed $newVersion, stats = ${recordedMetrics.get.json}")
loadedVersion
} catch {
case t: Throwable =>
@@ -495,7 +502,7 @@ class RocksDB(
def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) =
(writeBufferManager, lruCache)
/** Get current instantaneous statistics */
- def metrics: RocksDBMetrics = {
+ private def metrics: RocksDBMetrics = {
import HistogramType._
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
@@ -549,6 +556,25 @@ class RocksDB(
nativeOpsMetrics = nativeOpsMetrics.toMap)
}
+ /**
+ * Function to return RocksDB metrics if the recorded metrics are available
and the operator
+ * has reached the commit stage for this state store instance and version.
If not, we return None
+ * @return - Return RocksDBMetrics if available and None otherwise
+ */
+ def metricsOpt: Option[RocksDBMetrics] = {
+ var rocksDBMetricsOpt: Option[RocksDBMetrics] = None
+ try {
+ acquire()
+ rocksDBMetricsOpt = recordedMetrics
+ } catch {
+ case ex: Exception =>
+ logInfo(s"Failed to acquire metrics with exception=$ex")
+ } finally {
+ release()
+ }
+ rocksDBMetricsOpt
+ }
+
private def acquire(): Unit = acquireLock.synchronized {
val newAcquiredThreadInfo = AcquiredThreadInfo()
val waitStartTime = System.currentTimeMillis
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 4254640201c5..9552e2c81bb1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -110,52 +110,65 @@ private[sql] class RocksDBStateStoreProvider
}
override def metrics: StateStoreMetrics = {
- val rocksDBMetrics = rocksDB.metrics
- def commitLatencyMs(typ: String): Long =
rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)
- def nativeOpsLatencyMillis(typ: String): Long = {
- rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
- }
- def sumNativeOpsLatencyMillis(typ: String): Long = {
- rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum /
1000).getOrElse(0)
- }
- def nativeOpsCount(typ: String): Long = {
- rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0)
- }
- def nativeOpsMetrics(typ: String): Long = {
- rocksDBMetrics.nativeOpsMetrics.getOrElse(typ, 0)
- }
+ val rocksDBMetricsOpt = rocksDB.metricsOpt
+
+ if (rocksDBMetricsOpt.isDefined) {
+ val rocksDBMetrics = rocksDBMetricsOpt.get
+
+ def commitLatencyMs(typ: String): Long =
+ rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)
+
+ def nativeOpsLatencyMillis(typ: String): Long = {
+ rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
+ }
- val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long](
- CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes,
- CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"),
- CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"),
- CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"),
- CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"),
- CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"),
- CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
- CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
- CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
- CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
- CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
- CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
- CUSTOM_METRIC_BLOCK_CACHE_MISS ->
nativeOpsMetrics("readBlockCacheMissCount"),
- CUSTOM_METRIC_BLOCK_CACHE_HITS ->
nativeOpsMetrics("readBlockCacheHitCount"),
- CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"),
- CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"),
- CUSTOM_METRIC_ITERATOR_BYTES_READ ->
nativeOpsMetrics("totalBytesReadThroughIterator"),
- CUSTOM_METRIC_STALL_TIME ->
nativeOpsLatencyMillis("writerStallDuration"),
- CUSTOM_METRIC_TOTAL_COMPACT_TIME ->
sumNativeOpsLatencyMillis("compaction"),
- CUSTOM_METRIC_COMPACT_READ_BYTES ->
nativeOpsMetrics("totalBytesReadByCompaction"),
- CUSTOM_METRIC_COMPACT_WRITTEN_BYTES ->
nativeOpsMetrics("totalBytesWrittenByCompaction"),
- CUSTOM_METRIC_FLUSH_WRITTEN_BYTES ->
nativeOpsMetrics("totalBytesWrittenByFlush"),
- CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE ->
rocksDBMetrics.pinnedBlocksMemUsage
- ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
- Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED ->
bytes)).getOrElse(Map())
-
- StateStoreMetrics(
- rocksDBMetrics.numUncommittedKeys,
- rocksDBMetrics.totalMemUsageBytes,
- stateStoreCustomMetrics)
+ def sumNativeOpsLatencyMillis(typ: String): Long = {
+ rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum /
1000).getOrElse(0)
+ }
+
+ def nativeOpsCount(typ: String): Long = {
+ rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0)
+ }
+
+ def nativeOpsMetrics(typ: String): Long = {
+ rocksDBMetrics.nativeOpsMetrics.getOrElse(typ, 0)
+ }
+
+ val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long](
+ CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes,
+ CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"),
+ CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"),
+ CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"),
+ CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"),
+ CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"),
+ CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
+ CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
+ CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
+ CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
+ CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
+ CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
+ CUSTOM_METRIC_BLOCK_CACHE_MISS ->
nativeOpsMetrics("readBlockCacheMissCount"),
+ CUSTOM_METRIC_BLOCK_CACHE_HITS ->
nativeOpsMetrics("readBlockCacheHitCount"),
+ CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"),
+ CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"),
+ CUSTOM_METRIC_ITERATOR_BYTES_READ ->
nativeOpsMetrics("totalBytesReadThroughIterator"),
+ CUSTOM_METRIC_STALL_TIME ->
nativeOpsLatencyMillis("writerStallDuration"),
+ CUSTOM_METRIC_TOTAL_COMPACT_TIME ->
sumNativeOpsLatencyMillis("compaction"),
+ CUSTOM_METRIC_COMPACT_READ_BYTES ->
nativeOpsMetrics("totalBytesReadByCompaction"),
+ CUSTOM_METRIC_COMPACT_WRITTEN_BYTES ->
nativeOpsMetrics("totalBytesWrittenByCompaction"),
+ CUSTOM_METRIC_FLUSH_WRITTEN_BYTES ->
nativeOpsMetrics("totalBytesWrittenByFlush"),
+ CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE ->
rocksDBMetrics.pinnedBlocksMemUsage
+ ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
+ Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED ->
bytes)).getOrElse(Map())
+
+ StateStoreMetrics(
+ rocksDBMetrics.numUncommittedKeys,
+ rocksDBMetrics.totalMemUsageBytes,
+ stateStoreCustomMetrics)
+ } else {
+ logInfo(s"Failed to collect metrics for store_id=$id and
version=$version")
+ StateStoreMetrics(0, 0, Map.empty)
+ }
}
override def hasCommitted: Boolean = state == COMMITTED
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index a6e65825a5bc..3559a10444a4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -131,19 +131,25 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
tryWithProviderResource(newStoreProvider()) { provider =>
- val store = provider.getStore(0)
- // Verify state after updating
- put(store, "a", 0, 1)
- assert(get(store, "a", 0) === Some(1))
- assert(store.commit() === 1)
- provider.doMaintenance()
- assert(store.hasCommitted)
- val storeMetrics = store.metrics
- assert(storeMetrics.numKeys === 1)
+ val store = provider.getStore(0)
+ // Verify state after updating
+ put(store, "a", 0, 1)
+ assert(get(store, "a", 0) === Some(1))
+ assert(store.commit() === 1)
+ provider.doMaintenance()
+ assert(store.hasCommitted)
+ val storeMetrics = store.metrics
+ assert(storeMetrics.numKeys === 1)
+ // SPARK-46249 - In the case of changelog checkpointing, the snapshot
upload happens in
+ // the context of the background maintenance thread. The file manager
metrics are updated
+ // here and will be available as part of the next metrics update. So
we cannot rely on the
+ // file manager metrics to be available here for this version.
+ if (!isChangelogCheckpointingEnabled) {
assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) >
0L)
assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) ==
0L)
assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) >
0L)
assert(getCustomMetric(storeMetrics,
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
+ }
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index e290f808f560..9ce2137df72c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -922,13 +922,12 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
withDB(remoteDir) { db =>
- verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
db.load(0)
db.put("a", "1") // put also triggers a db get
db.get("a") // this is found in-memory writebatch - no get triggered
in db
db.get("b") // key doesn't exists - triggers db get
db.commit()
- verifyMetrics(putCount = 1, getCount = 3, metrics = db.metrics)
+ verifyMetrics(putCount = 1, getCount = 3, metrics = db.metricsOpt.get)
db.load(1)
db.put("b", "2") // put also triggers a db get
@@ -936,7 +935,7 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
db.get("c") // key doesn't exists - triggers db get
assert(iterator(db).toSet === Set(("a", "1"), ("b", "2")))
db.commit()
- verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true,
db.metrics)
+ verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true,
db.metricsOpt.get)
}
}
@@ -944,19 +943,18 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
withDB(remoteDir, conf = dbConf.copy(resetStatsOnLoad = false)) { db =>
- verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
db.load(0)
db.put("a", "1") // put also triggers a db get
db.commit()
// put and get counts are cumulative
- verifyMetrics(putCount = 1, getCount = 1, metrics = db.metrics)
+ verifyMetrics(putCount = 1, getCount = 1, metrics = db.metricsOpt.get)
db.load(1)
db.put("b", "2") // put also triggers a db get
db.get("a")
db.commit()
// put and get counts are cumulative: existing get=1, put=1: new
get=2, put=1
- verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
+ verifyMetrics(putCount = 2, getCount = 3, metrics = db.metricsOpt.get)
}
}
@@ -974,7 +972,7 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
db.put("b", "25")
db.commit()
- val metrics = db.metrics
+ val metrics = db.metricsOpt.get
assert(metrics.nativeOpsHistograms("compaction").count > 0)
assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0)
assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0)
@@ -1194,13 +1192,10 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
db.put("a", "5")
db.put("b", "5")
- assert(db.metrics.numUncommittedKeys === 2)
- assert(db.metrics.numCommittedKeys === 0)
-
curVersion = db.commit()
- assert(db.metrics.numUncommittedKeys === 2)
- assert(db.metrics.numCommittedKeys === 2)
+ assert(db.metricsOpt.get.numUncommittedKeys === 2)
+ assert(db.metricsOpt.get.numCommittedKeys === 2)
}
// restart with config "trackTotalNumberOfRows = false"
@@ -1208,16 +1203,13 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = false)) {
db =>
db.load(curVersion)
- assert(db.metrics.numUncommittedKeys === -1)
- assert(db.metrics.numCommittedKeys === -1)
-
db.put("b", "7")
db.put("c", "7")
curVersion = db.commit()
- assert(db.metrics.numUncommittedKeys === -1)
- assert(db.metrics.numCommittedKeys === -1)
+ assert(db.metricsOpt.get.numUncommittedKeys === -1)
+ assert(db.metricsOpt.get.numCommittedKeys === -1)
}
// restart with config "trackTotalNumberOfRows = true" again
@@ -1225,19 +1217,13 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) {
db =>
db.load(curVersion)
- assert(db.metrics.numUncommittedKeys === 3)
- assert(db.metrics.numCommittedKeys === 3)
-
db.put("c", "8")
db.put("d", "8")
- assert(db.metrics.numUncommittedKeys === 4)
- assert(db.metrics.numCommittedKeys === 3)
-
curVersion = db.commit()
- assert(db.metrics.numUncommittedKeys === 4)
- assert(db.metrics.numCommittedKeys === 4)
+ assert(db.metricsOpt.get.numUncommittedKeys === 4)
+ assert(db.metricsOpt.get.numCommittedKeys === 4)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]