This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77c790cdfec [SPARK-45602][CORE][SQL][SS][YARN][K8S] Replace 
`s.c.MapOps.filterKeys` with `s.c.MapOps.view.filterKeys`
77c790cdfec is described below

commit 77c790cdfec8fd1e49a7f479217487ea370bc25c
Author: yangjie01 <[email protected]>
AuthorDate: Fri Oct 20 23:01:27 2023 +0800

    [SPARK-45602][CORE][SQL][SS][YARN][K8S] Replace `s.c.MapOps.filterKeys` 
with `s.c.MapOps.view.filterKeys`
    
    ### What changes were proposed in this pull request?
    This pr replace `s.c.MapOps.filterKeys` with `s.c.MapOps.view.filterKeys`  
due to `s.c.MapOps.filterKeys` marked as deprecated since Scala 2.13.0:
    
    
https://github.com/scala/scala/blob/bf45e199e96383b96a6955520d7d2524c78e6e12/src/library/scala/collection/Map.scala#L248-L254
    
    ```scala
    deprecated("Use .view.filterKeys(f). A future version will include a strict 
version of this method (for now, .view.filterKeys(p).toMap).", "2.13.0")
      def filterKeys(p: K => Boolean): MapView[K, V] = new 
MapView.FilterKeys(this, p)
    ```
    
    ### Why are the changes needed?
    Cleanup deprecated API usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Acitons
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43445 from LuciferYang/SPARK-45602.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../apache/spark/sql/kafka010/KafkaContinuousStream.scala   |  2 +-
 .../main/scala/org/apache/spark/deploy/master/Master.scala  |  1 +
 .../org/apache/spark/deploy/rest/RestSubmissionClient.scala |  2 +-
 .../spark/executor/CoarseGrainedExecutorBackend.scala       |  4 ++--
 .../scala/org/apache/spark/resource/ResourceProfile.scala   |  4 ++--
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala     |  2 +-
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala   |  2 +-
 .../scheduler/cluster/StandaloneSchedulerBackend.scala      |  2 +-
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala  |  2 +-
 core/src/main/scala/org/apache/spark/ui/PagedTable.scala    |  1 +
 .../scala/org/apache/spark/HeartbeatReceiverSuite.scala     |  2 +-
 .../test/scala/org/apache/spark/SparkThrowableSuite.scala   |  1 +
 .../apache/spark/internal/plugin/PluginContainerSuite.scala |  2 +-
 .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala |  4 ++--
 .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala     |  2 +-
 .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala  |  2 +-
 .../org/apache/spark/sql/catalyst/catalog/interface.scala   |  3 ++-
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala    |  2 +-
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala  |  2 +-
 .../main/scala/org/apache/spark/sql/DataFrameWriter.scala   |  2 +-
 .../org/apache/spark/sql/execution/command/tables.scala     |  6 +++---
 .../apache/spark/sql/execution/datasources/DataSource.scala |  2 +-
 .../apache/spark/sql/execution/datasources/FileFormat.scala |  3 ++-
 .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala  |  2 +-
 .../spark/sql/execution/datasources/v2/CacheTableExec.scala |  2 +-
 .../sql/execution/datasources/v2/DataSourceV2Utils.scala    |  2 +-
 .../sql/execution/datasources/v2/FileDataSourceV2.scala     |  2 +-
 .../sql/execution/datasources/v2/ShowCreateTableExec.scala  |  4 ++--
 .../sql/execution/datasources/v2/V2SessionCatalog.scala     |  2 +-
 .../sql/execution/streaming/state/RocksDBFileManager.scala  |  2 +-
 .../org/apache/spark/sql/execution/ui/ExecutionPage.scala   |  4 ++--
 .../org/apache/spark/sql/streaming/DataStreamReader.scala   |  2 +-
 .../org/apache/spark/sql/streaming/DataStreamWriter.scala   |  2 +-
 .../org/apache/spark/sql/hive/HiveExternalCatalog.scala     | 13 +++++++------
 .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala    |  5 +++--
 .../org/apache/spark/sql/hive/execution/HiveOptions.scala   |  2 +-
 .../apache/spark/sql/hive/HiveSchemaInferenceSuite.scala    |  2 +-
 .../scala/org/apache/spark/sql/hive/StatisticsSuite.scala   |  9 +++++----
 .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala  |  9 +++++----
 .../sql/hive/execution/command/ShowCreateTableSuite.scala   |  2 +-
 40 files changed, 65 insertions(+), 56 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 0ca39817279..026c4d56072 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -102,7 +102,7 @@ class KafkaContinuousStream(
     }
 
     val startOffsets = newPartitionOffsets ++
-      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+      oldStartPartitionOffsets.view.filterKeys(!deletedPartitions.contains(_))
     knownPartitions = startOffsets.keySet
 
     startOffsets.toSeq.map {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e98f08d7ec4..46503e017ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -917,6 +917,7 @@ private[deploy] class Master(
   private def decommissionWorkersOnHosts(hostnames: Seq[String]): Integer = {
     val hostnamesSet = hostnames.map(_.toLowerCase(Locale.ROOT)).toSet
     val workersToRemove = addressToWorker
+      .view
       .filterKeys(addr => 
hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)))
       .values
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index a690937c202..68f08dd951e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -422,7 +422,7 @@ private[spark] object RestSubmissionClient {
    * Filter non-spark environment variables from any environment.
    */
   private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
-    env.filterKeys { k =>
+    env.view.filterKeys { k =>
       k.startsWith("SPARK_") && !EXCLUDED_SPARK_ENV_VARS.contains(k)
     }.toMap
   }
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 299148a912a..b074ac814a9 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -154,13 +154,13 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   def extractLogUrls: Map[String, String] = {
     val prefix = "SPARK_LOG_URL_"
-    sys.env.filterKeys(_.startsWith(prefix))
+    sys.env.view.filterKeys(_.startsWith(prefix))
       .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), 
e._2)).toMap
   }
 
   def extractAttributes: Map[String, String] = {
     val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
-    sys.env.filterKeys(_.startsWith(prefix))
+    sys.env.view.filterKeys(_.startsWith(prefix))
       .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), 
e._2)).toMap
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 6d6197d58a1..fdaa68b6931 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -95,12 +95,12 @@ class ResourceProfile(
   }
 
   private[spark] def getCustomTaskResources(): Map[String, 
TaskResourceRequest] = {
-    taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
+    taskResources.view.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
   }
 
   protected[spark] def getCustomExecutorResources(): Map[String, 
ExecutorResourceRequest] = {
     executorResources.
-      filterKeys(k => 
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
+      view.filterKeys(k => 
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
   }
 
   /*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 07a71ebed08..230e49db536 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -838,7 +838,7 @@ private[spark] class DAGScheduler(
     if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
       logError("No stages registered for job " + job.jobId)
     } else {
-      stageIdToStage.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
+      stageIdToStage.view.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
         case (stageId, stage) =>
           val jobSet = stage.jobIds
           if (!jobSet.contains(job.jobId)) {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c770e5c9950..274f867ce35 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -371,7 +371,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       // Make sure no executor is killed while some task is launching on it
       val taskDescs = withLock {
         // Filter out executors under killing
-        val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
+        val activeExecutors = executorDataMap.view.filterKeys(isExecutorActive)
         val workOffers = activeExecutors.map {
           case (id, executorData) => buildWorkerOffer(id, executorData)
         }.toIndexedSeq
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 9f71a0fe58c..f1be78ab93d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -253,7 +253,7 @@ private[spark] class StandaloneSchedulerBackend(
 
   override def getDriverLogUrls: Option[Map[String, String]] = {
     val prefix = "SPARK_DRIVER_LOG_URL_"
-    val driverLogUrls = sys.env.filterKeys(_.startsWith(prefix))
+    val driverLogUrls = sys.env.view.filterKeys(_.startsWith(prefix))
       .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), 
e._2)).toMap
     if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None
   }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b21a2aa1c17..d539611271c 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -653,7 +653,7 @@ final class ShuffleBlockFetcherIterator(
         hostLocalDirManager.getHostLocalDirs(host, port, 
bmIds.map(_.executorId)) {
           case Success(dirsByExecId) =>
             fetchMultipleHostLocalBlocks(
-              hostLocalBlocksWithMissingDirs.filterKeys(bmIds.contains).toMap,
+              
hostLocalBlocksWithMissingDirs.view.filterKeys(bmIds.contains).toMap,
               dirsByExecId,
               cached = false)
 
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala 
b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 99052628f6e..208b8189b4d 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -207,6 +207,7 @@ private[spark] trait PagedTable[T] {
           .withKeyValueSeparator("=")
           .split(search)
           .asScala
+          .view
           .filterKeys(_ != pageSizeFormField)
           .filterKeys(_ != pageNumberFormField)
           .mapValues(URLDecoder.decode(_, UTF_8.name()))
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index a8351322e01..95ef417239c 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -306,7 +306,7 @@ class HeartbeatReceiverSuite
     // We may receive undesired SparkListenerExecutorAdded from 
LocalSchedulerBackend,
     // so exclude it from the map. See SPARK-10800.
     heartbeatReceiver.invokePrivate(_executorLastSeen()).
-      filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap
+      view.filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap
   }
 }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 4d011398c63..fb82714949b 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -152,6 +152,7 @@ class SparkThrowableSuite extends SparkFunSuite {
 
   test("Message format invariants") {
     val messageFormats = errorReader.errorInfoMap
+      .view
       .filterKeys(!_.startsWith("_LEGACY_ERROR_"))
       .filterKeys(!_.startsWith("INTERNAL_ERROR"))
       .values.toSeq.flatMap { i => Seq(i.messageTemplate) }
diff --git 
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
 
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
index 4bfa624fbd4..ef214bd50d9 100644
--- 
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -264,7 +264,7 @@ object NonLocalModeSparkPlugin {
       resources: Map[String, ResourceInformation]): String = {
     // try to keep this simple and only write the gpus addresses, if we add 
more resources need to
     // make more complex
-    val resourcesString = resources.filterKeys(_.equals(GPU)).map {
+    val resourcesString = resources.view.filterKeys(_.equals(GPU)).map {
       case (_, ri) =>
         s"${ri.addresses.mkString(",")}"
     }.mkString(",")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index e6c2171058d..e678489e100 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -187,7 +187,7 @@ class ExecutorPodsAllocator(
     // to the schedulerKnownNewlyCreatedExecs
     val schedulerKnownExecs = 
schedulerBackend.getExecutorIds().map(_.toLong).toSet
     schedulerKnownNewlyCreatedExecs ++=
-      
newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
+      
newlyCreatedExecutors.view.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
     newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet
 
     // For all executors we've created against the API but have not seen in a 
snapshot
@@ -239,7 +239,7 @@ class ExecutorPodsAllocator(
       _deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
     }
 
-    val notDeletedPods = 
lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))
+    val notDeletedPods = 
lastSnapshot.executorPods.view.filterKeys(!_deletedExecutorIds.contains(_))
     // Map the pods into per ResourceProfile id so we can check per 
ResourceProfile,
     // add a fast path if not using other ResourceProfiles.
     val rpIdToExecsAndPodState =
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index d4c24e8a39f..cdaf04173c2 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -205,7 +205,7 @@ private[yarn] class ExecutorRunnable(
     val env = new HashMap[String, String]()
     Client.populateClasspath(null, conf, sparkConf, env, 
sparkConf.get(EXECUTOR_CLASS_PATH))
 
-    System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
+    System.getenv().asScala.view.filterKeys(_.startsWith("SPARK"))
       .foreach { case (k, v) => env(k) = v }
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 448353d3a5c..d9a5bd6240a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -344,7 +344,7 @@ private[yarn] class YarnAllocator(
         val gpuResource = sparkConf.get(YARN_GPU_DEVICE)
         val fpgaResource = sparkConf.get(YARN_FPGA_DEVICE)
         getYarnResourcesAndAmounts(sparkConf, 
config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
-          customSparkResources.filterKeys { r =>
+          customSparkResources.view.filterKeys { r =>
             (r == gpuResource || r == fpgaResource)
           }
       } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 26b38676b07..634afb47ea5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -407,7 +407,7 @@ case class CatalogTable(
   def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
     val map = new mutable.LinkedHashMap[String, String]()
     val tableProperties =
-      
SQLConf.get.redactOptions(properties.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap)
+      
SQLConf.get.redactOptions(properties.view.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap)
         .toSeq.sortBy(_._1)
         .map(p => p._1 + "=" + p._2)
     val partitionColumns = 
partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
@@ -561,6 +561,7 @@ object CatalogTable {
       createTime = 0L,
       lastAccessTime = 0L,
       properties = table.properties
+        .view
         .filterKeys(!nondeterministicProps.contains(_))
         .map(identity)
         .toMap,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index cfed1761e33..6b54853a02c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -544,7 +544,7 @@ class CodegenContext extends Logging {
         s"private $className $classInstance = new $className();"
     }
 
-    val declareNestedClasses = classFunctions.filterKeys(_ != 
outerClassName).map {
+    val declareNestedClasses = classFunctions.view.filterKeys(_ != 
outerClassName).map {
       case (className, functions) =>
         s"""
            |private class $className {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 8f976a49a2b..fbc4e4109f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -762,7 +762,7 @@ object View {
     // as optimization configs but they are still needed during the view 
resolution.
     // TODO: remove this `retainedConfigs` after the `RelationConversions` is 
moved to
     // optimization phase.
-    val retainedConfigs = activeConf.getAllConfs.filterKeys(key =>
+    val retainedConfigs = activeConf.getAllConfs.view.filterKeys(key =>
       Seq(
         "spark.sql.hive.convertMetastoreParquet",
         "spark.sql.hive.convertMetastoreOrc",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 127fae14cb1..7169be78345 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -262,7 +262,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 
       val optionsWithPath = getOptionsWithPath(path)
 
-      val finalOptions = 
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+      val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
         optionsWithPath.originalMap
       val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f67e23f21e0..5c748bcb084 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -674,7 +674,7 @@ case class DescribeTableCommand(
     )
     append(buffer, "", "", "")
     append(buffer, "# Detailed Table Information", "", "")
-    table.toLinkedHashMap.filterKeys(!excludedTableInfo.contains(_)).foreach {
+    
table.toLinkedHashMap.view.filterKeys(!excludedTableInfo.contains(_)).foreach {
       s => append(buffer, s._1, s._2, "")
     }
   }
@@ -955,7 +955,7 @@ case class ShowTablePropertiesCommand(
             Seq(Row(p, propValue))
           }
         case None =>
-          properties.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
+          properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
             .toSeq.sortBy(_._1).map(p => Row(p._1, p._2))
       }
     }
@@ -1103,7 +1103,7 @@ trait ShowCreateTableCommandBase extends SQLConfHelper {
   }
 
   private def showViewProperties(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
-    val viewProps = 
metadata.properties.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
+    val viewProps = 
metadata.properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
     if (viewProps.nonEmpty) {
       val props = viewProps.toSeq.sortBy(_._1).map { case (key, value) =>
         s"'${escapeSingleQuotedString(key)}' = 
'${escapeSingleQuotedString(value)}'"
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 68f17787c89..8bb6947488a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -796,7 +796,7 @@ object DataSource extends Logging {
    */
   def buildStorageFormatFromOptions(options: Map[String, String]): 
CatalogStorageFormat = {
     val path = CaseInsensitiveMap(options).get("path")
-    val optionsWithoutPath = options.filterKeys(_.toLowerCase(Locale.ROOT) != 
"path")
+    val optionsWithoutPath = 
options.view.filterKeys(_.toLowerCase(Locale.ROOT) != "path")
     CatalogStorageFormat.empty.copy(
       locationUri = path.map(CatalogUtils.stringToURI), properties = 
optionsWithoutPath.toMap)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index df16f8161d1..babe3e88d58 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -310,7 +310,8 @@ object FileFormat {
     // fields whose values can be derived from a file status. In particular, 
we don't have accurate
     // file split information yet, nor do we have a way to provide custom 
metadata column values.
     val validFieldNames = Set(FILE_PATH, FILE_NAME, FILE_SIZE, 
FILE_MODIFICATION_TIME)
-    val extractors = 
FileFormat.BASE_METADATA_EXTRACTORS.filterKeys(validFieldNames.contains).toMap
+    val extractors =
+      
FileFormat.BASE_METADATA_EXTRACTORS.view.filterKeys(validFieldNames.contains).toMap
     assert(fieldNames.forall(validFieldNames.contains))
     val pf = PartitionedFile(
       partitionValues = partitionValues,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 57651684070..daa282fda6a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -62,7 +62,7 @@ class JDBCOptions(
    */
   val asConnectionProperties: Properties = {
     val properties = new Properties()
-    parameters.originalMap.filterKeys(key => 
!jdbcOptionNames(key.toLowerCase(Locale.ROOT)))
+    parameters.originalMap.view.filterKeys(key => 
!jdbcOptionNames(key.toLowerCase(Locale.ROOT)))
       .foreach { case (k, v) => properties.setProperty(k, v) }
     properties
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 1744df83033..b540ab6a4c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -41,7 +41,7 @@ trait BaseCacheTableExec extends LeafV2CommandExec {
     val storageLevel = CaseInsensitiveMap(options).get(storageLevelKey)
       .map(s => StorageLevel.fromString(s.toUpperCase(Locale.ROOT)))
       .getOrElse(conf.defaultCacheStorageLevel)
-    val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != 
storageLevelKey)
+    val withoutStorageLevel = 
options.view.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey)
     if (withoutStorageLevel.nonEmpty) {
       logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index 574bc869de4..e4853006157 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -106,7 +106,7 @@ private[sql] object DataSourceV2Utils extends Logging {
 
     val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*)
 
-    val finalOptions = 
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+    val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
       optionsWithPath.originalMap
     val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
     val (table, catalog, ident) = provider match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 19c9b3d8bb3..03c6589f8dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -58,7 +58,7 @@ trait FileDataSourceV2 extends TableProvider with 
DataSourceRegister {
   }
 
   protected def getOptionsWithoutPaths(map: CaseInsensitiveStringMap): 
CaseInsensitiveStringMap = {
-    val withoutPath = map.asCaseSensitiveMap().asScala.filterKeys { k =>
+    val withoutPath = map.asCaseSensitiveMap().asScala.view.filterKeys { k =>
       !k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths")
     }
     new CaseInsensitiveStringMap(withoutPath.toMap.asJava)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
index 8f9a925ed4b..facfa567472 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
@@ -50,7 +50,7 @@ case class ShowCreateTableExec(
     showTableDataColumns(table, builder)
     showTableUsing(table, builder)
 
-    val tableOptions = table.properties.asScala
+    val tableOptions = table.properties.asScala.view
       .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
       case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
     }.toMap
@@ -132,7 +132,7 @@ case class ShowCreateTableExec(
       builder: StringBuilder,
       tableOptions: Map[String, String]): Unit = {
 
-    val showProps = table.properties.asScala
+    val showProps = table.properties.asScala.view
       .filterKeys(key => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
         && !key.startsWith(TableCatalog.OPTION_PREFIX)
         && !tableOptions.contains(key))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index a43f42a87d1..bd2e6597493 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -150,7 +150,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
   }
 
   private def toOptions(properties: Map[String, String]): Map[String, String] 
= {
-    properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
+    properties.view.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
       case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
     }.toMap
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 3d0745c2fb3..6a62a6c52f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -458,7 +458,7 @@ class RocksDBFileManager(
     // Get the immutable files used in previous versions, as some of those 
uploaded files can be
     // reused for this version
     logInfo(s"Saving RocksDB files to DFS for $version")
-    val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < 
version)
+    val prevFilesToSizes = versionToRocksDBFiles.asScala.view.filterKeys(_ < 
version)
       .values.flatten.map { f =>
       f.localFileName -> f
     }.toMap
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index d1aefdb3463..2a3028155a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -85,9 +85,9 @@ class ExecutionPage(parent: SQLTab) extends 
WebUIPage("execution") with Logging
       summary ++
         planVisualization(request, metrics, graph) ++
         physicalPlanDescription(executionUIData.physicalPlanDescription) ++
-        
modifiedConfigs(configs.filterKeys(!_.startsWith(pandasOnSparkConfPrefix)).toMap)
 ++
+        
modifiedConfigs(configs.view.filterKeys(!_.startsWith(pandasOnSparkConfPrefix)).toMap)
 ++
         modifiedPandasOnSparkConfigs(
-          configs.filterKeys(_.startsWith(pandasOnSparkConfPrefix)).toMap)
+          configs.view.filterKeys(_.startsWith(pandasOnSparkConfPrefix)).toMap)
     }.getOrElse {
       <div>No information to display for query {executionId}</div>
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index e572cb26874..fc8f5a416ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -174,7 +174,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
       case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] 
=>
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = sparkSession.sessionState.conf)
-        val finalOptions = 
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+        val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
             optionsWithPath.originalMap
         val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
         val table = DataSourceV2Utils.getTableFromProvider(provider, 
dsOptions, userSpecifiedSchema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index c53ecc56bb6..036afa62b48 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -384,7 +384,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
         val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = df.sparkSession.sessionState.conf)
-        val finalOptions = 
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+        val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
           optionsWithPath.originalMap
         val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
         // If the source accepts external table metadata, here we pass the 
schema of input query
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 67292460bbc..5c340522f91 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -870,7 +870,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
     }
     val storageWithoutHiveGeneratedProperties = 
storageWithLocation.copy(properties =
-      
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
+      
storageWithLocation.properties.view.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
     val schemaFromTableProps =
@@ -885,7 +885,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       partitionColumnNames = partColumnNames,
       bucketSpec = getBucketSpecFromTableProperties(table),
       tracksPartitionsInCatalog = partitionProvider == 
Some(TABLE_PARTITION_PROVIDER_CATALOG),
-      properties = 
table.properties.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap)
+      properties = 
table.properties.view.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap)
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
@@ -1160,14 +1160,15 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
       properties: Map[String, String],
       table: String): Option[CatalogStatistics] = {
 
-    val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+    val statsProps = 
properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX))
     if (statsProps.isEmpty) {
       None
     } else {
       val colStats = new mutable.HashMap[String, CatalogColumnStat]
-      val colStatsProps = 
properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
-        case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
-      }.toMap
+      val colStatsProps =
+        
properties.view.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
+          case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
+        }.toMap
 
       // Find all the column names by matching the KEY_VERSION properties for 
them.
       colStatsProps.keys.filter {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index dbfb4d65bd1..08f69aecdd2 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -133,12 +133,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     // Consider table and storage properties. For properties existing in both 
sides, storage
     // properties will supersede table properties.
     if (serde.contains("parquet")) {
-      val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty).toMap ++
+      val options = 
relation.tableMeta.properties.view.filterKeys(isParquetProperty).toMap ++
         relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
         
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
         convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet", isWrite)
     } else {
-      val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty).toMap ++
+      val options = 
relation.tableMeta.properties.view.filterKeys(isOrcProperty).toMap ++
         relation.tableMeta.storage.properties
       if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
         convertToLogicalRelation(
@@ -377,6 +377,7 @@ private[hive] object HiveMetastoreCatalog {
     // Find any nullable fields in metastore schema that are missing from the 
inferred schema.
     val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> 
f).toMap
     val missingNullables = metastoreFields
+      .view
       .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
       .values
       .filter(_.nullable)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
index 044a515fbdc..44f0e43d11e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
@@ -85,7 +85,7 @@ class HiveOptions(@transient private val parameters: 
CaseInsensitiveMap[String])
       s"line delimiter, but given: $lineDelim.")
   }
 
-  def serdeProperties: Map[String, String] = parameters.filterKeys {
+  def serdeProperties: Map[String, String] = parameters.view.filterKeys {
     k => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT))
   }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }.toMap
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
index ebf02287489..400befb91d9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -124,7 +124,7 @@ class HiveSchemaInferenceSuite
     // properties out).
     assert(!externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME).schemaPreservesCase)
     val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
-    
assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)
+    
assert(rawTable.properties.view.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)
 
     // Add partition records (if specified)
     if (!partitionCols.isEmpty) {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 507c482525c..808defa359c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -892,7 +892,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
    */
   private def getStatsProperties(tableName: String): Map[String, String] = {
     val hTable = 
hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName)
-    hTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap
+    hTable.properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap
   }
 
   test("change stats after insert command for hive table") {
@@ -1130,7 +1130,8 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
     def checkColStatsProps(expected: Map[String, String]): Unit = {
       sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + 
stats.keys.mkString(", "))
       val table = hiveClient.getTable("default", tableName)
-      val props = 
table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats")).toMap
+      val props =
+        
table.properties.view.filterKeys(_.startsWith("spark.sql.statistics.colStats")).toMap
       assert(props == expected)
     }
 
@@ -1199,11 +1200,11 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS cint, 
ctimestamp")
         val table = hiveClient.getTable("default", tableName)
         val intHistogramProps = table.properties
-          
.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram"))
+          
.view.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram"))
         assert(intHistogramProps.size == 1)
 
         val tsHistogramProps = table.properties
-          
.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram"))
+          
.view.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram"))
         assert(tsHistogramProps.size == 1)
 
         // Validate histogram after deserialization.
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index b6971204b1c..62b2d849c68 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -132,7 +132,7 @@ class HiveDDLSuite
       createTime = 0L,
       lastAccessTime = 0L,
       owner = "",
-      properties = 
table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
+      properties = 
table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap,
       // View texts are checked separately
       viewText = None
     )
@@ -1088,7 +1088,7 @@ class HiveDDLSuite
       expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ")
     val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" 
-> "4"))
     assert(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was 
already set")
-    assert(oldPart.storage.properties.filterKeys(expectedSerdeProps.contains) 
!=
+    
assert(oldPart.storage.properties.view.filterKeys(expectedSerdeProps.contains) 
!=
       expectedSerdeProps, "bad test: serde properties were already set")
     sql(s"""ALTER TABLE boxes PARTITION (width=4)
       |    SET SERDE '$expectedSerde'
@@ -1096,7 +1096,7 @@ class HiveDDLSuite
       |""".stripMargin)
     val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" 
-> "4"))
     assert(newPart.storage.serde == Some(expectedSerde))
-    
assert(newPart.storage.properties.filterKeys(expectedSerdeProps.contains).toMap 
==
+    
assert(newPart.storage.properties.view.filterKeys(expectedSerdeProps.contains).toMap
 ==
       expectedSerdeProps)
   }
 
@@ -1697,7 +1697,8 @@ class HiveDDLSuite
       "maxFileSize",
       "minFileSize"
     )
-    
assert(targetTable.properties.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
+    assert(
+      
targetTable.properties.view.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
       "the table properties of source tables should not be copied in the 
created table")
 
     provider match {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
index 5f8f250f8e9..c7e19e943e6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
@@ -246,7 +246,7 @@ class ShowCreateTableSuite extends 
v1.ShowCreateTableSuiteBase with CommandSuite
       table.copy(
         createTime = 0L,
         lastAccessTime = 0L,
-        properties = 
table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
+        properties = 
table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap,
         stats = None,
         ignoredProperties = Map.empty,
         storage = table.storage.copy(properties = Map.empty),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to