This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 01cf6f4 [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache 01cf6f4 is described below commit 01cf6f4c6b2a593a2a8717fd2cda13725424120e Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Aug 4 12:01:44 2021 -0700 [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache ### What changes were proposed in this pull request? There are 3 ways to use Guava cache in spark code: 1. `Loadingcache` is the main way to use Guava cache in spark code and the key usages are as follows: a. `LoadingCache` with `maximumsize` data eviction policy, such as `appCache` in `ApplicationCache`, `cache` in `Codegenerator` b. `LoadingCache` with `maximumWeight` data eviction policy, such as `shuffleIndexCache` in `ExternalShuffleBlockResolver` c. `LoadingCache` with 'expireAfterWrite' data eviction policy, such as `tableRelationCache` in `SessionCatalog` 2. `ManualCache` is another way to use Guava cache in spark code and the key usage is `cache` in `SharedInMemoryCache`, it use to caches partition file statuses in memory 3. The last use way is `hadoopJobMetadata` in `SparkEnv`, it uses Guava Cache to build a `soft-reference map`. The goal of this pr is use `Caffeine` instead of `Guava Cache` because `Caffeine` is faster than `Guava Cache` from benchmarks, the main changes as follows: 1. Add `Caffeine` deps to maven `pom.xml` 2. Use `Caffeine` instead of Guava `LoadingCache`, `ManualCache` and soft-reference map in `SparkEnv` 3. Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine` ### Why are the changes needed? `Caffeine` is faster than `Guava Cache` from benchmarks ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass the Jenkins or GitHub Action - Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine` Closes #31517 from LuciferYang/guava-cache-to-caffeine. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Holden Karau <hka...@netflix.com> --- common/network-shuffle/pom.xml | 4 + .../shuffle/ExternalShuffleBlockResolver.java | 32 +++----- .../network/shuffle/RemoteBlockPushResolver.java | 24 +++--- .../LocalCacheBenchmark-jdk11-results.txt | 12 +++ core/benchmarks/LocalCacheBenchmark-results.txt | 12 +++ core/pom.xml | 4 + .../src/main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../spark/deploy/history/ApplicationCache.scala | 37 +++++---- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 24 +++--- .../org/apache/spark/storage/BlockManager.scala | 4 +- .../org/apache/spark/storage/BlockManagerId.scala | 15 ++-- .../spark/storage/BlockManagerMasterEndpoint.scala | 4 +- .../main/scala/org/apache/spark/util/Utils.scala | 19 +++-- .../org/apache/spark/LocalCacheBenchmark.scala | 93 ++++++++++++++++++++++ .../org/apache/spark/executor/ExecutorSuite.scala | 18 ++--- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 3 + dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 3 + pom.xml | 6 ++ resource-managers/kubernetes/core/pom.xml | 5 ++ .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 9 ++- sql/catalyst/pom.xml | 4 + .../sql/catalyst/catalog/SessionCatalog.scala | 9 +-- .../expressions/SubExprEvaluationRuntime.scala | 21 ++--- .../expressions/codegen/CodeGenerator.scala | 6 +- .../catalyst/util/DateTimeFormatterHelper.scala | 4 +- ...CodeGeneratorWithInterpretedFallbackSuite.scala | 4 +- .../SubExprEvaluationRuntimeSuite.scala | 20 ++--- sql/core/pom.xml | 5 +- .../execution/datasources/FileStatusCache.scala | 13 ++- .../spark/sql/execution/metric/SQLMetrics.scala | 4 +- 30 files changed, 283 insertions(+), 139 deletions(-) diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index d3d78f2..1b78182 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -59,6 +59,10 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 73d4e6c..650f33e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -20,10 +20,7 @@ package org.apache.spark.network.shuffle; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.stream.Collectors; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -32,11 +29,10 @@ import org.apache.commons.lang3.tuple.Pair; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.Weigher; import com.google.common.collect.Maps; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -109,23 +105,13 @@ public class ExternalShuffleBlockResolver { Executor directoryCleaner) throws IOException { this.conf = conf; this.rddFetchEnabled = - Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false")); + Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false")); this.registeredExecutorFile = registeredExecutorFile; String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); - CacheLoader<File, ShuffleIndexInformation> indexCacheLoader = - new CacheLoader<File, ShuffleIndexInformation>() { - public ShuffleIndexInformation load(File file) throws IOException { - return new ShuffleIndexInformation(file); - } - }; - shuffleIndexCache = CacheBuilder.newBuilder() + shuffleIndexCache = Caffeine.newBuilder() .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) - .weigher(new Weigher<File, ShuffleIndexInformation>() { - public int weigh(File file, ShuffleIndexInformation indexInfo) { - return indexInfo.getSize(); - } - }) - .build(indexCacheLoader); + .weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize()) + .build(ShuffleIndexInformation::new); db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); if (db != null) { executors = reloadRegisteredExecutors(db); @@ -317,7 +303,7 @@ public class ExternalShuffleBlockResolver { "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); - } catch (ExecutionException e) { + } catch (CompletionException e) { throw new RuntimeException("Failed to open file: " + indexFile, e); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 9a45f2c..8578843 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -31,9 +31,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.*; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -43,10 +47,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.Weigher; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import org.roaringbitmap.RoaringBitmap; @@ -115,16 +115,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); - CacheLoader<File, ShuffleIndexInformation> indexCacheLoader = - new CacheLoader<File, ShuffleIndexInformation>() { - public ShuffleIndexInformation load(File file) throws IOException { - return new ShuffleIndexInformation(file); - } - }; - indexCache = CacheBuilder.newBuilder() + indexCache = Caffeine.newBuilder() .maximumWeight(conf.mergedIndexCacheSize()) - .weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize()) - .build(indexCacheLoader); + .weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize()) + .build(ShuffleIndexInformation::new); this.errorHandler = new ErrorHandler.BlockPushErrorHandler(); } @@ -299,7 +293,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId); return new FileSegmentManagedBuffer( conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); - } catch (ExecutionException e) { + } catch (CompletionException e) { throw new RuntimeException(String.format( "Failed to open merged shuffle index file %s", indexFile.getPath()), e); } diff --git a/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt b/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt new file mode 100644 index 0000000..ceca386 --- /dev/null +++ b/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt @@ -0,0 +1,12 @@ +================================================================================================ +Loading Cache +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.7 +Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz +Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------- +Guava Cache 5 6 1 15.9 62.8 1.0X +Caffeine 2 2 0 46.1 21.7 2.9X + + diff --git a/core/benchmarks/LocalCacheBenchmark-results.txt b/core/benchmarks/LocalCacheBenchmark-results.txt new file mode 100644 index 0000000..563d470 --- /dev/null +++ b/core/benchmarks/LocalCacheBenchmark-results.txt @@ -0,0 +1,12 @@ +================================================================================================ +Loading Cache +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_232-b18 on Mac OS X 10.15.7 +Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz +Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------- +Guava Cache 5 5 0 16.7 60.0 1.0X +Caffeine 2 2 1 44.3 22.6 2.7X + + diff --git a/core/pom.xml b/core/pom.xml index be44964..f3f9e48 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -48,6 +48,10 @@ <artifactId>guava</artifactId> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> <groupId>com.twitter</groupId> <artifactId>chill_${scala.binary.version}</artifactId> </dependency> diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ee50a8f..dae5568 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -26,7 +26,7 @@ import scala.collection.concurrent import scala.collection.mutable import scala.util.Properties -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Caffeine import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.DeveloperApi @@ -77,7 +77,7 @@ class SparkEnv ( // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = - CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap() + Caffeine.newBuilder().softValues().build[String, AnyRef]().asMap() private[spark] var driverTmpDir: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index 89b30a3..2be417a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -18,15 +18,14 @@ package org.apache.spark.deploy.history import java.util.NoSuchElementException -import java.util.concurrent.ExecutionException +import java.util.concurrent.CompletionException import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ import com.codahale.metrics.{Counter, MetricRegistry, Timer} -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} -import com.google.common.util.concurrent.UncheckedExecutionException +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache, RemovalCause, RemovalListener} import org.eclipse.jetty.servlet.FilterHolder import org.apache.spark.internal.Logging @@ -62,21 +61,27 @@ private[history] class ApplicationCache( /** * Removal event notifies the provider to detach the UI. - * @param rm removal notification + * @param key removal key + * @param value removal value + * @param cause the reason why a `CacheEntry` was removed, it should + * always be `SIZE` because `appCache` configured with + * `maximumSize` eviction strategy */ - override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = { + override def onRemoval(key: CacheKey, value: CacheEntry, cause: RemovalCause): Unit = { metrics.evictionCount.inc() - val key = rm.getKey - logDebug(s"Evicting entry ${key}") - operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui) + logDebug(s"Evicting entry $key") + operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui) } } private val appCache: LoadingCache[CacheKey, CacheEntry] = { - CacheBuilder.newBuilder() - .maximumSize(retainedApplications) - .removalListener(removalListener) - .build(appLoader) + val builder = Caffeine.newBuilder() + .maximumSize(retainedApplications) + .removalListener(removalListener) + // SPARK-34309: Use custom Executor to compatible with + // the data eviction behavior of Guava cache + .executor((command: Runnable) => command.run()) + builder.build[CacheKey, CacheEntry](appLoader) } /** @@ -86,9 +91,9 @@ private[history] class ApplicationCache( def get(appId: String, attemptId: Option[String] = None): CacheEntry = { try { - appCache.get(new CacheKey(appId, attemptId)) + appCache.get(CacheKey(appId, attemptId)) } catch { - case e @ (_: ExecutionException | _: UncheckedExecutionException) => + case e @ (_: CompletionException | _: RuntimeException) => throw Option(e.getCause()).getOrElse(e) } } @@ -127,7 +132,7 @@ private[history] class ApplicationCache( } /** @return Number of cached UIs. */ - def size(): Long = appCache.size() + def size(): Long = appCache.estimatedSize() private def time[T](t: Timer)(f: => T): T = { val timeCtx = t.time() @@ -197,7 +202,7 @@ private[history] class ApplicationCache( val sb = new StringBuilder(s"ApplicationCache(" + s" retainedApplications= $retainedApplications)") sb.append(s"; time= ${clock.getTimeMillis()}") - sb.append(s"; entry count= ${appCache.size()}\n") + sb.append(s"; entry count= ${appCache.estimatedSize()}\n") sb.append("----\n") appCache.asMap().asScala.foreach { case(key, entry) => sb.append(s" $key -> $entry\n") diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 7339eb6..f47f823 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.google.common.cache.{CacheBuilder, CacheLoader} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import org.apache.hadoop.fs.Path import org.apache.spark._ @@ -85,16 +85,18 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( } // Cache of preferred locations of checkpointed files. - @transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder() - .expireAfterWrite( - SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get, - TimeUnit.MINUTES) - .build( - new CacheLoader[Partition, Seq[String]]() { - override def load(split: Partition): Seq[String] = { - getPartitionBlockLocations(split) - } - }) + @transient private[spark] lazy val cachedPreferredLocations = { + val builder = Caffeine.newBuilder() + .expireAfterWrite( + SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get, + TimeUnit.MINUTES) + val loader = new CacheLoader[Partition, Seq[String]]() { + override def load(split: Partition): Seq[String] = { + getPartitionBlockLocations(split) + } + } + builder.build[Partition, Seq[String]](loader) + } // Returns the block locations of given partition on file system. private def getPartitionBlockLocations(split: Partition): Seq[String] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4c646b2..20d1e03 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -34,7 +34,7 @@ import scala.util.{Failure, Random, Success, Try} import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Caffeine import org.apache.commons.io.IOUtils import org.apache.spark._ @@ -123,7 +123,7 @@ private[spark] class HostLocalDirManager( blockStoreClient: BlockStoreClient) extends Logging { private val executorIdToLocalDirsCache = - CacheBuilder + Caffeine .newBuilder() .maximumSize(cacheSize) .build[String, Array[String]]() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index c6a4457..316ad69 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} -import com.google.common.cache.{CacheBuilder, CacheLoader} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi @@ -136,11 +136,14 @@ private[spark] object BlockManagerId { * The max cache size is hardcoded to 10000, since the size of a BlockManagerId * object is about 48B, the total memory cost should be below 1MB which is feasible. */ - val blockManagerIdCache = CacheBuilder.newBuilder() - .maximumSize(10000) - .build(new CacheLoader[BlockManagerId, BlockManagerId]() { - override def load(id: BlockManagerId) = id - }) + val blockManagerIdCache = { + Caffeine.newBuilder() + .maximumSize(10000) + .build[BlockManagerId, BlockManagerId]( + new CacheLoader[BlockManagerId, BlockManagerId]() { + override def load(id: BlockManagerId): BlockManagerId = id + }) + } def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { blockManagerIdCache.get(id) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 29c605d..ef82d52 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.util.Random import scala.util.control.NonFatal -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Caffeine import org.apache.spark.{MapOutputTrackerMaster, SparkConf} import org.apache.spark.annotation.DeveloperApi @@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint( // Mapping from executor id to the block manager's local disk directories. private val executorIdToLocalDirs = - CacheBuilder + Caffeine .newBuilder() .maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE)) .build[String, Array[String]]() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7ea96fe..f3268cb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -44,7 +44,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import scala.util.matching.Regex import _root_.io.netty.channel.unix.Errors.NativeIoException -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} import com.google.common.collect.Interners import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses @@ -1616,13 +1616,16 @@ private[spark] object Utils extends Logging { if (compressedLogFileLengthCache == null) { val compressedLogFileLengthCacheSize = sparkConf.get( UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF) - compressedLogFileLengthCache = CacheBuilder.newBuilder() - .maximumSize(compressedLogFileLengthCacheSize) - .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() { - override def load(path: String): java.lang.Long = { - Utils.getCompressedFileLength(new File(path)) - } - }) + compressedLogFileLengthCache = { + val builder = Caffeine.newBuilder() + .maximumSize(compressedLogFileLengthCacheSize) + builder.build[String, java.lang.Long]( + new CacheLoader[String, java.lang.Long]() { + override def load(path: String): java.lang.Long = { + Utils.getCompressedFileLength(new File(path)) + } + }) + } } compressedLogFileLengthCache } diff --git a/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala b/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala new file mode 100644 index 0000000..5eadfdf --- /dev/null +++ b/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.Callable + +import scala.concurrent.duration.Duration +import scala.util.Random + +import com.github.benmanes.caffeine.cache.{CacheLoader => CaffeineCacheLoader, Caffeine} +import com.google.common.cache.{CacheBuilder, CacheLoader} + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.util.ThreadUtils + +/** + * Benchmark for Guava Cache vs Caffeine. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> + * 2. build/sbt "core/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>" + * Results will be written to "benchmarks/LocalCacheBenchmark-results.txt". + * }}} + */ +object LocalCacheBenchmark extends BenchmarkBase { + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Loading Cache") { + val size = 10000 + val parallelism = 8 + val guavaCacheConcurrencyLevel = 8 + val dataset = (1 to parallelism) + .map(_ => Random.shuffle(List.range(0, size))) + .map(list => list.map(i => TestData(i))) + val executor = ThreadUtils.newDaemonFixedThreadPool(parallelism, "Loading Cache Test Pool") + val guavaCacheLoader = new CacheLoader[TestData, TestData]() { + override def load(id: TestData): TestData = { + id + } + } + val caffeineCacheLoader = new CaffeineCacheLoader[TestData, TestData]() { + override def load(id: TestData): TestData = { + id + } + } + + val benchmark = new Benchmark("Loading Cache", size * parallelism, 3, output = output) + benchmark.addCase("Guava Cache") { _ => + val cache = CacheBuilder.newBuilder() + .concurrencyLevel(guavaCacheConcurrencyLevel).build[TestData, TestData](guavaCacheLoader) + dataset.map(dataList => executor.submit(new Callable[Unit] { + override def call(): Unit = { + dataList.foreach(key => cache.get(key)) + } + })).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf)) + cache.cleanUp() + } + + benchmark.addCase("Caffeine") { _ => + val cache = Caffeine.newBuilder().build[TestData, TestData](caffeineCacheLoader) + dataset.map(dataList => executor.submit(new Callable[Unit] { + override def call(): Unit = { + dataList.foreach(key => cache.get(key)) + } + })).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf)) + cache.cleanUp() + } + + benchmark.run() + } + } + + case class TestData(content: Int) +} + diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index a237447..8ec279a 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -29,7 +29,7 @@ import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, Map} import scala.concurrent.duration._ -import com.google.common.cache.{CacheBuilder, CacheLoader} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} @@ -467,9 +467,9 @@ class ExecutorSuite extends SparkFunSuite } } - def errorInGuavaCache(e: => Throwable): Throwable = { - val cache = CacheBuilder.newBuilder() - .build(new CacheLoader[String, String] { + def errorInCaffeine(e: => Throwable): Throwable = { + val cache = Caffeine.newBuilder().build[String, String]( + new CacheLoader[String, String] { override def load(key: String): String = throw e }) intercept[Throwable] { @@ -484,18 +484,18 @@ class ExecutorSuite extends SparkFunSuite import Executor.isFatalError // `e`'s depth is 1 so `depthToCheck` needs to be at least 3 to detect fatal errors. assert(isFatalError(e, depthToCheck) == (depthToCheck >= 1 && isFatal)) + assert(isFatalError(errorInCaffeine(e), depthToCheck) == (depthToCheck >= 1 && isFatal)) // `e`'s depth is 2 so `depthToCheck` needs to be at least 3 to detect fatal errors. assert(isFatalError(errorInThreadPool(e), depthToCheck) == (depthToCheck >= 2 && isFatal)) - assert(isFatalError(errorInGuavaCache(e), depthToCheck) == (depthToCheck >= 2 && isFatal)) assert(isFatalError( new SparkException("foo", e), depthToCheck) == (depthToCheck >= 2 && isFatal)) - // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors. assert(isFatalError( - errorInThreadPool(errorInGuavaCache(e)), - depthToCheck) == (depthToCheck >= 3 && isFatal)) + errorInThreadPool(errorInCaffeine(e)), + depthToCheck) == (depthToCheck >= 2 && isFatal)) + // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors. assert(isFatalError( - errorInGuavaCache(errorInThreadPool(e)), + errorInCaffeine(errorInThreadPool(e)), depthToCheck) == (depthToCheck >= 3 && isFatal)) assert(isFatalError( new SparkException("foo", new SparkException("foo", e)), diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 50b5bf0..92f6308 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -30,7 +30,9 @@ blas/2.2.0//blas-2.2.0.jar bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar breeze_2.12/1.2//breeze_2.12-1.2.jar +caffeine/2.9.1//caffeine-2.9.1.jar cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar +checker-qual/3.10.0//checker-qual-3.10.0.jar chill-java/0.10.0//chill-java-0.10.0.jar chill_2.12/0.10.0//chill_2.12-0.10.0.jar commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar @@ -62,6 +64,7 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar +error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar generex/1.0.2//generex-1.0.2.jar gson/2.2.4//gson-2.2.4.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 31c9b96..4cacf0a 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -25,7 +25,9 @@ blas/2.2.0//blas-2.2.0.jar bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar breeze_2.12/1.2//breeze_2.12-1.2.jar +caffeine/2.9.1//caffeine-2.9.1.jar cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar +checker-qual/3.10.0//checker-qual-3.10.0.jar chill-java/0.10.0//chill-java-0.10.0.jar chill_2.12/0.10.0//chill_2.12-0.10.0.jar commons-cli/1.2//commons-cli-1.2.jar @@ -53,6 +55,7 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar +error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar generex/1.0.2//generex-1.0.2.jar gson/2.2.4//gson-2.2.4.jar diff --git a/pom.xml b/pom.xml index d17e055..8f82d1e 100644 --- a/pom.xml +++ b/pom.xml @@ -182,6 +182,7 @@ <commons-pool2.version>2.6.2</commons-pool2.version> <datanucleus-core.version>4.1.17</datanucleus-core.version> <guava.version>14.0.1</guava.version> + <caffeine.version>2.9.1</caffeine.version> <janino.version>3.0.16</janino.version> <jersey.version>2.34</jersey.version> <joda.version>2.10.10</joda.version> @@ -493,6 +494,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${caffeine.version}</version> + </dependency> + <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-model</artifactId> <version>1.4.8</version> diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 0cb5e11..cb9c7b1 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -53,6 +53,11 @@ </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + + <dependency> <groupId>io.fabric8</groupId> <artifactId>kubernetes-client</artifactId> <version>${kubernetes-client.version}</version> diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index e255de4..38e7f99 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -18,12 +18,13 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.TimeUnit -import com.google.common.cache.CacheBuilder -import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} -import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import scala.collection.mutable +import com.github.benmanes.caffeine.cache.Caffeine +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient + import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -47,7 +48,7 @@ private[spark] class ExecutorPodsLifecycleManager( // to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond // bounds. private lazy val removedExecutorsCache = - CacheBuilder.newBuilder() + Caffeine.newBuilder() .expireAfterWrite(3, TimeUnit.MINUTES) .build[java.lang.Long, java.lang.Long]() diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index f1d3a3a..22fa097 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -93,6 +93,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> </dependency> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4860f46..6b1f519 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -26,7 +26,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.util.{Failure, Success, Try} -import com.google.common.cache.{Cache, CacheBuilder} +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -159,19 +159,18 @@ class SessionCatalog( } private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { - var builder = CacheBuilder.newBuilder() + var builder = Caffeine.newBuilder() .maximumSize(cacheSize) if (cacheTTL > 0) { builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) } - - builder.build[QualifiedTableName, LogicalPlan]() + builder.build() } /** This method provides a way to get a cached plan. */ def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = { - tableRelationCache.get(t, c) + tableRelationCache.get(t, (_: QualifiedTableName) => c.call()) } /** This method provides a way to get a cached plan if the key exists. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala index fcc8ee6..8cfa466 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.util.IdentityHashMap -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.spark.sql.catalyst.InternalRow @@ -38,14 +38,17 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) { // won't be use by multi-threads so we don't need to consider concurrency here. private var proxyExpressionCurrentId = 0 - private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder() - .maximumSize(cacheMaxEntries) - .build( - new CacheLoader[ExpressionProxy, ResultProxy]() { - override def load(expr: ExpressionProxy): ResultProxy = { - ResultProxy(expr.proxyEval(currentInput)) - } - }) + private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = + Caffeine.newBuilder().maximumSize(cacheMaxEntries) + // SPARK-34309: Use custom Executor to compatible with + // the data eviction behavior of Guava cache + .executor((command: Runnable) => command.run()) + .build[ExpressionProxy, ResultProxy]( + new CacheLoader[ExpressionProxy, ResultProxy]() { + override def load(expr: ExpressionProxy): ResultProxy = { + ResultProxy(expr.proxyEval(currentInput)) + } + }) private var currentInput: InternalRow = null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7f2c1c6..4dbfd77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import com.google.common.cache.{CacheBuilder, CacheLoader} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler} @@ -1577,9 +1577,9 @@ object CodeGenerator extends Logging { * automatically, in order to constrain its memory footprint. Note that this cache does not use * weak keys/values and thus does not respond to memory pressure. */ - private val cache = CacheBuilder.newBuilder() + private val cache = Caffeine.newBuilder() .maximumSize(SQLConf.get.codegenCacheMaxEntries) - .build( + .build[CodeAndComment, (GeneratedClass, ByteCodeStats)]( new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() { override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = { val startTime = System.nanoTime() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index b00113b..9ae12c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -23,7 +23,7 @@ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverSt import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} import java.util.{Date, Locale} -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Caffeine import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -194,7 +194,7 @@ trait DateTimeFormatterHelper { } private object DateTimeFormatterHelper { - val cache = CacheBuilder.newBuilder() + val cache = Caffeine.newBuilder() .maximumSize(128) .build[(String, Locale, Boolean), DateTimeFormatter]() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala index da5bddb..ab177d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions -import java.util.concurrent.ExecutionException +import java.util.concurrent.CompletionException import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow @@ -83,7 +83,7 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT } test("codegen failures in the CODEGEN_ONLY mode") { - val errMsg = intercept[ExecutionException] { + val errMsg = intercept[CompletionException] { val input = Seq(BoundReference(0, IntegerType, nullable = true)) withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { FailedCodegenProjection.createObject(input) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala index f8dca26..88c1c0d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala @@ -23,47 +23,47 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite { test("Evaluate ExpressionProxy should create cached result") { val runtime = new SubExprEvaluationRuntime(1) val proxy = ExpressionProxy(Literal(1), 0, runtime) - assert(runtime.cache.size() == 0) + assert(runtime.cache.estimatedSize() == 0) proxy.eval() - assert(runtime.cache.size() == 1) + assert(runtime.cache.estimatedSize() == 1) assert(runtime.cache.get(proxy) == ResultProxy(1)) } test("SubExprEvaluationRuntime cannot exceed configured max entries") { val runtime = new SubExprEvaluationRuntime(2) - assert(runtime.cache.size() == 0) + assert(runtime.cache.estimatedSize() == 0) val proxy1 = ExpressionProxy(Literal(1), 0, runtime) proxy1.eval() - assert(runtime.cache.size() == 1) + assert(runtime.cache.estimatedSize() == 1) assert(runtime.cache.get(proxy1) == ResultProxy(1)) val proxy2 = ExpressionProxy(Literal(2), 1, runtime) proxy2.eval() - assert(runtime.cache.size() == 2) + assert(runtime.cache.estimatedSize() == 2) assert(runtime.cache.get(proxy2) == ResultProxy(2)) val proxy3 = ExpressionProxy(Literal(3), 2, runtime) proxy3.eval() - assert(runtime.cache.size() == 2) + assert(runtime.cache.estimatedSize() == 2) assert(runtime.cache.get(proxy3) == ResultProxy(3)) } test("setInput should empty cached result") { val runtime = new SubExprEvaluationRuntime(2) val proxy1 = ExpressionProxy(Literal(1), 0, runtime) - assert(runtime.cache.size() == 0) + assert(runtime.cache.estimatedSize() == 0) proxy1.eval() - assert(runtime.cache.size() == 1) + assert(runtime.cache.estimatedSize() == 1) assert(runtime.cache.get(proxy1) == ResultProxy(1)) val proxy2 = ExpressionProxy(Literal(2), 1, runtime) proxy2.eval() - assert(runtime.cache.size() == 2) + assert(runtime.cache.estimatedSize() == 2) assert(runtime.cache.get(proxy2) == ResultProxy(2)) runtime.setInput() - assert(runtime.cache.size() == 0) + assert(runtime.cache.estimatedSize() == 0) } test("Wrap ExpressionProxy on subexpressions") { diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 73fa60c..5993e98 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -89,7 +89,10 @@ <type>test-jar</type> <scope>test</scope> </dependency> - + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index b5d800f..5db69a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -import com.google.common.cache._ +import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener, Weigher} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging @@ -119,11 +119,10 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends } } } - val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() { - override def onRemoval( - removed: RemovalNotification[(ClientId, Path), - Array[FileStatus]]): Unit = { - if (removed.getCause == RemovalCause.SIZE && + val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]] { + override def onRemoval(key: (ClientId, Path), value: Array[FileStatus], + cause: RemovalCause): Unit = { + if (cause == RemovalCause.SIZE && warnedAboutEviction.compareAndSet(false, true)) { logWarning( "Evicting cached table partition metadata from memory due to size constraints " + @@ -133,7 +132,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends } } - var builder = CacheBuilder.newBuilder() + var builder = Caffeine.newBuilder() .weigher(weigher) .removalListener(removalListener) .maximumWeight(maxSizeInBytes / weightScale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index a613a39..d8fba8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -22,7 +22,7 @@ import java.util.{Arrays, Locale} import scala.concurrent.duration._ -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo @@ -97,7 +97,7 @@ object SQLMetrics { val cachedSQLAccumIdentifier = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) private val metricsCache: LoadingCache[String, Option[String]] = - CacheBuilder.newBuilder().maximumSize(10000) + Caffeine.newBuilder().maximumSize(10000) .build(new CacheLoader[String, Option[String]] { override def load(name: String): Option[String] = { Option(name) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org