This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2be20e54a22 [SPARK-44809][SS] Remove unused RocksDB custom metrics for
pause/writeBatch
2be20e54a22 is described below
commit 2be20e54a2222f6cdf64e8486d1910133b43665f
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Aug 16 11:17:04 2023 +0900
[SPARK-44809][SS] Remove unused RocksDB custom metrics for pause/writeBatch
### What changes were proposed in this pull request?
Remove unused RocksDB custom metrics for pause/writeBatch
### Why are the changes needed?
Remove unused metrics from query progress
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified existing tests
```
[info] Run completed in 23 seconds, 724 milliseconds.
[info] Total number of tests run: 11
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 53 s, completed Aug 14, 2023, 3:30:56 PM
```
Closes #42491 from anishshri-db/task/SPARK-44809.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../execution/streaming/state/RocksDBStateStoreProvider.scala | 10 ++--------
.../streaming/state/RocksDBStateStoreIntegrationSuite.scala | 4 ++--
2 files changed, 4 insertions(+), 10 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 53fd06fd24c..37a8785f04d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -126,10 +126,8 @@ private[sql] class RocksDBStateStoreProvider
CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"),
CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"),
CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"),
- CUSTOM_METRIC_WRITEBATCH_TIME -> commitLatencyMs("writeBatch"),
CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"),
CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
- CUSTOM_METRIC_PAUSE_TIME -> commitLatencyMs("pauseBg"),
CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
@@ -270,14 +268,10 @@ object RocksDBStateStoreProvider {
"rocksdbPutCount", "RocksDB: number of put calls")
// Commit latency detailed breakdown
- val CUSTOM_METRIC_WRITEBATCH_TIME = StateStoreCustomTimingMetric(
- "rocksdbCommitWriteBatchLatency", "RocksDB: commit - write batch time")
val CUSTOM_METRIC_FLUSH_TIME = StateStoreCustomTimingMetric(
"rocksdbCommitFlushLatency", "RocksDB: commit - flush time")
val CUSTOM_METRIC_COMMIT_COMPACT_TIME = StateStoreCustomTimingMetric(
"rocksdbCommitCompactLatency", "RocksDB: commit - compact time")
- val CUSTOM_METRIC_PAUSE_TIME = StateStoreCustomTimingMetric(
- "rocksdbCommitPauseLatency", "RocksDB: commit - pause bg time")
val CUSTOM_METRIC_CHECKPOINT_TIME = StateStoreCustomTimingMetric(
"rocksdbCommitCheckpointLatency", "RocksDB: commit - checkpoint time")
val CUSTOM_METRIC_FILESYNC_TIME = StateStoreCustomTimingMetric(
@@ -332,8 +326,8 @@ object RocksDBStateStoreProvider {
val ALL_CUSTOM_METRICS = Seq(
CUSTOM_METRIC_SST_FILE_SIZE, CUSTOM_METRIC_GET_TIME,
CUSTOM_METRIC_PUT_TIME,
- CUSTOM_METRIC_WRITEBATCH_TIME, CUSTOM_METRIC_FLUSH_TIME,
CUSTOM_METRIC_COMMIT_COMPACT_TIME,
- CUSTOM_METRIC_PAUSE_TIME, CUSTOM_METRIC_CHECKPOINT_TIME,
CUSTOM_METRIC_FILESYNC_TIME,
+ CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_COMMIT_COMPACT_TIME,
+ CUSTOM_METRIC_CHECKPOINT_TIME, CUSTOM_METRIC_FILESYNC_TIME,
CUSTOM_METRIC_BYTES_COPIED, CUSTOM_METRIC_FILES_COPIED,
CUSTOM_METRIC_FILES_REUSED,
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED, CUSTOM_METRIC_GET_COUNT,
CUSTOM_METRIC_PUT_COUNT,
CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS,
CUSTOM_METRIC_BYTES_READ,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 339d00058fc..2eb7d98bea8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -94,8 +94,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
val stateOperatorMetrics = nextProgress.stateOperators(0)
assert(JavaConverters.asScalaSet(stateOperatorMetrics.customMetrics.keySet) ===
Set(
"rocksdbGetLatency", "rocksdbCommitCompactLatency",
"rocksdbBytesCopied",
- "rocksdbPutLatency", "rocksdbCommitPauseLatency",
"rocksdbFilesReused",
- "rocksdbCommitWriteBatchLatency", "rocksdbFilesCopied",
"rocksdbSstFileSize",
+ "rocksdbPutLatency", "rocksdbFilesReused",
+ "rocksdbFilesCopied", "rocksdbSstFileSize",
"rocksdbCommitCheckpointLatency",
"rocksdbZipFileBytesUncompressed",
"rocksdbCommitFlushLatency", "rocksdbCommitFileSyncLatencyMs",
"rocksdbGetCount",
"rocksdbPutCount", "rocksdbTotalBytesRead",
"rocksdbTotalBytesWritten",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]