This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5970d353360d [SPARK-45767][CORE] Delete `TimeStampedHashMap` and its UT
5970d353360d is described below
commit 5970d353360d4fb6647c8fbc10f733abf009eca1
Author: panbingkun <[email protected]>
AuthorDate: Thu Nov 2 23:04:07 2023 +0800
[SPARK-45767][CORE] Delete `TimeStampedHashMap` and its UT
### What changes were proposed in this pull request?
The pr aims to delete `TimeStampedHashMap` and its UT.
### Why are the changes needed?
During Pr https://github.com/apache/spark/pull/43578, we found that the
class `TimeStampedHashMap` is no longer in use. Based on the suggestion, we
have removed it.
https://github.com/apache/spark/pull/43578#discussion_r1378687555
- First time this class `TimeStampedHashMap` be introduced:
https://github.com/apache/spark/commit/b18d70870a33a4783c6b3b787bef9b0eec30bce0#diff-77b12178a7036c71135074c6ddf7d659e5a69906264d5e3061087e4352e304ed
introduced this data structure
- After https://github.com/apache/spark/pull/22339, this class
`TimeStampedHashMap` is only being used in UT `TimeStampedHashMapSuite`.
So, after Spark 3.0, this data structure has not been used by any
production code of Spark.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43633 from panbingkun/remove_TimeStampedHashMap.
Authored-by: panbingkun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../org/apache/spark/util/TimeStampedHashMap.scala | 143 ----------------
.../spark/util/TimeStampedHashMapSuite.scala | 179 ---------------------
2 files changed, 322 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
deleted file mode 100644
index b0fb33946520..000000000000
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.Map.Entry
-import java.util.Set
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-import org.apache.spark.internal.Logging
-
-private[spark] case class TimeStampedValue[V](value: V, timestamp: Long)
-
-/**
- * This is a custom implementation of scala.collection.mutable.Map which
stores the insertion
- * timestamp along with each key-value pair. If specified, the timestamp of
each pair can be
- * updated every time it is accessed. Key-value pairs whose timestamp are
older than a particular
- * threshold time can then be removed using the clearOldValues method. This is
intended to
- * be a drop-in replacement of scala.collection.mutable.HashMap.
- *
- * @param updateTimeStampOnGet Whether timestamp of a pair will be updated
when it is accessed
- */
-private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean =
false)
- extends mutable.Map[A, B]() with Logging {
-
- // Note: this class supports Scala 2.13. A parallel source tree has a 2.12
implementation.
-
- private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
-
- def get(key: A): Option[B] = {
- val value = internalMap.get(key)
- if (value != null && updateTimeStampOnGet) {
- internalMap.replace(key, value, TimeStampedValue(value.value,
currentTime))
- }
- Option(value).map(_.value)
- }
-
- def iterator: Iterator[(A, B)] = {
- getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value))
- }
-
- def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet
-
- override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
- val newMap = new TimeStampedHashMap[A, B1]
- val oldInternalMap = this.internalMap.asInstanceOf[ConcurrentHashMap[A,
TimeStampedValue[B1]]]
- newMap.internalMap.putAll(oldInternalMap)
- kv match { case (a, b) => newMap.internalMap.put(a, TimeStampedValue(b,
currentTime)) }
- newMap
- }
-
- override def addOne(kv: (A, B)): this.type = {
- kv match { case (a, b) => internalMap.put(a, TimeStampedValue(b,
currentTime)) }
- this
- }
-
- override def subtractOne(key: A): this.type = {
- internalMap.remove(key)
- this
- }
-
- override def update(key: A, value: B): Unit = {
- this += ((key, value))
- }
-
- override def apply(key: A): B = {
- get(key).getOrElse { throw new NoSuchElementException() }
- }
-
- override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = {
- internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v)
}.filter(p)
- }
-
- override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]()
-
- override def size: Int = internalMap.size
-
- override def foreach[U](f: ((A, B)) => U): Unit = {
- val it = getEntrySet.iterator
- while(it.hasNext) {
- val entry = it.next()
- val kv = (entry.getKey, entry.getValue.value)
- f(kv)
- }
- }
-
- def putIfAbsent(key: A, value: B): Option[B] = {
- val prev = internalMap.putIfAbsent(key, TimeStampedValue(value,
currentTime))
- Option(prev).map(_.value)
- }
-
- def putAll(map: Map[A, B]): Unit = {
- map.foreach { case (k, v) => update(k, v) }
- }
-
- def toMap: Map[A, B] = iterator.toMap
-
- def clearOldValues(threshTime: Long, f: (A, B) => Unit): Unit = {
- val it = getEntrySet.iterator
- while (it.hasNext) {
- val entry = it.next()
- if (entry.getValue.timestamp < threshTime) {
- f(entry.getKey, entry.getValue.value)
- logDebug("Removing key " + entry.getKey)
- it.remove()
- }
- }
- }
-
- /** Removes old key-value pairs that have timestamp earlier than
`threshTime`. */
- def clearOldValues(threshTime: Long): Unit = {
- clearOldValues(threshTime, (_, _) => ())
- }
-
- private def currentTime: Long = System.currentTimeMillis
-
- // For testing
-
- def getTimeStampedValue(key: A): Option[TimeStampedValue[B]] = {
- Option(internalMap.get(key))
- }
-
- def getTimestamp(key: A): Option[Long] = {
- getTimeStampedValue(key).map(_.timestamp)
- }
-}
diff --git
a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
deleted file mode 100644
index 164454094683..000000000000
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-
-import org.apache.spark.SparkFunSuite
-
-class TimeStampedHashMapSuite extends SparkFunSuite {
-
- // Test the testMap function - a Scala HashMap should obviously pass
- testMap(new mutable.HashMap[String, String]())
-
- // Test TimeStampedHashMap basic functionality
- testMap(new TimeStampedHashMap[String, String]())
- testMapThreadSafety(new TimeStampedHashMap[String, String]())
-
- test("TimeStampedHashMap - clearing by timestamp") {
- // clearing by insertion time
- val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet =
false)
- map("k1") = "v1"
- assert(map("k1") === "v1")
- Thread.sleep(10)
- val threshTime = System.currentTimeMillis
- assert(map.getTimestamp("k1").isDefined)
- assert(map.getTimestamp("k1").get < threshTime)
- map.clearOldValues(threshTime)
- assert(map.get("k1") === None)
-
- // clearing by modification time
- val map1 = new TimeStampedHashMap[String, String](updateTimeStampOnGet =
true)
- map1("k1") = "v1"
- map1("k2") = "v2"
- assert(map1("k1") === "v1")
- Thread.sleep(10)
- val threshTime1 = System.currentTimeMillis
- Thread.sleep(10)
- assert(map1("k2") === "v2") // access k2 to update its access time to
> threshTime
- assert(map1.getTimestamp("k1").isDefined)
- assert(map1.getTimestamp("k1").get < threshTime1)
- assert(map1.getTimestamp("k2").isDefined)
- assert(map1.getTimestamp("k2").get >= threshTime1)
- map1.clearOldValues(threshTime1) // should only clear k1
- assert(map1.get("k1") === None)
- assert(map1.get("k2").isDefined)
- }
-
- /** Test basic operations of a Scala mutable Map. */
- def testMap(hashMapConstructor: => mutable.Map[String, String]): Unit = {
- def newMap() = hashMapConstructor
- val testMap1 = newMap()
- val testMap2 = newMap()
- val name = testMap1.getClass.getSimpleName
-
- test(name + " - basic test") {
- // put, get, and apply
- testMap1 += (("k1", "v1"))
- assert(testMap1.get("k1").isDefined)
- assert(testMap1("k1") === "v1")
- testMap1("k2") = "v2"
- assert(testMap1.get("k2").isDefined)
- assert(testMap1("k2") === "v2")
- assert(testMap1("k2") === "v2")
- testMap1.update("k3", "v3")
- assert(testMap1.get("k3").isDefined)
- assert(testMap1("k3") === "v3")
-
- // remove
- testMap1.remove("k1")
- assert(testMap1.get("k1").isEmpty)
- testMap1.remove("k2")
- intercept[NoSuchElementException] {
- testMap1("k2") // Map.apply(<non-existent-key>) causes exception
- }
- testMap1 -= "k3"
- assert(testMap1.get("k3").isEmpty)
-
- // multi put
- val keys = (1 to 100).map(_.toString)
- val pairs = keys.map(x => (x, x * 2))
- assert((testMap2 ++ pairs).iterator.toSet === pairs.toSet)
- testMap2 ++= pairs
-
- // iterator
- assert(testMap2.iterator.toSet === pairs.toSet)
-
- // filter
- val filtered = testMap2.filter { case (_, v) => v.toInt % 2 == 0 }
- val evenPairs = pairs.filter { case (_, v) => v.toInt % 2 == 0 }
- assert(filtered.iterator.toSet === evenPairs.toSet)
-
- // foreach
- val buffer = new ArrayBuffer[(String, String)]
- testMap2.foreach(x => buffer += x)
- assert(testMap2.toSet === buffer.toSet)
-
- // multi remove
- testMap2("k1") = "v1"
- testMap2 --= keys
- assert(testMap2.size === 1)
- assert(testMap2.iterator.toSeq.head === (("k1", "v1")))
-
- // +
- val testMap3 = testMap2 + (("k0", "v0"))
- assert(testMap3.size === 2)
- assert(testMap3.get("k1").isDefined)
- assert(testMap3("k1") === "v1")
- assert(testMap3.get("k0").isDefined)
- assert(testMap3("k0") === "v0")
-
- // -
- val testMap4 = testMap3 - "k0"
- assert(testMap4.size === 1)
- assert(testMap4.get("k1").isDefined)
- assert(testMap4("k1") === "v1")
- }
- }
-
- /** Test thread safety of a Scala mutable map. */
- def testMapThreadSafety(hashMapConstructor: => mutable.Map[String, String]):
Unit = {
- def newMap() = hashMapConstructor
- val name = newMap().getClass.getSimpleName
- val testMap = newMap()
- @volatile var error = false
-
- def getRandomKey(m: mutable.Map[String, String]): Option[String] = {
- val keys = testMap.keysIterator.toSeq
- if (keys.nonEmpty) {
- Some(keys(Random.nextInt(keys.size)))
- } else {
- None
- }
- }
-
- val threads = (1 to 25).map(i => new Thread() {
- override def run(): Unit = {
- try {
- for (j <- 1 to 1000) {
- Random.nextInt(3) match {
- case 0 =>
- testMap(Random.nextString(10)) = Random.nextDouble().toString
// put
- case 1 =>
- getRandomKey(testMap).map(testMap.get) // get
- case 2 =>
- getRandomKey(testMap).map(testMap.remove) // remove
- }
- }
- } catch {
- case t: Throwable =>
- error = true
- throw t
- }
- }
- })
-
- test(name + " - threading safety test") {
- threads.foreach(_.start())
- threads.foreach(_.join())
- assert(!error)
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]