This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 77c790cdfec [SPARK-45602][CORE][SQL][SS][YARN][K8S] Replace
`s.c.MapOps.filterKeys` with `s.c.MapOps.view.filterKeys`
77c790cdfec is described below
commit 77c790cdfec8fd1e49a7f479217487ea370bc25c
Author: yangjie01 <[email protected]>
AuthorDate: Fri Oct 20 23:01:27 2023 +0800
[SPARK-45602][CORE][SQL][SS][YARN][K8S] Replace `s.c.MapOps.filterKeys`
with `s.c.MapOps.view.filterKeys`
### What changes were proposed in this pull request?
This pr replace `s.c.MapOps.filterKeys` with `s.c.MapOps.view.filterKeys`
due to `s.c.MapOps.filterKeys` marked as deprecated since Scala 2.13.0:
https://github.com/scala/scala/blob/bf45e199e96383b96a6955520d7d2524c78e6e12/src/library/scala/collection/Map.scala#L248-L254
```scala
deprecated("Use .view.filterKeys(f). A future version will include a strict
version of this method (for now, .view.filterKeys(p).toMap).", "2.13.0")
def filterKeys(p: K => Boolean): MapView[K, V] = new
MapView.FilterKeys(this, p)
```
### Why are the changes needed?
Cleanup deprecated API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Acitons
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43445 from LuciferYang/SPARK-45602.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../apache/spark/sql/kafka010/KafkaContinuousStream.scala | 2 +-
.../main/scala/org/apache/spark/deploy/master/Master.scala | 1 +
.../org/apache/spark/deploy/rest/RestSubmissionClient.scala | 2 +-
.../spark/executor/CoarseGrainedExecutorBackend.scala | 4 ++--
.../scala/org/apache/spark/resource/ResourceProfile.scala | 4 ++--
.../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +-
.../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +-
.../scheduler/cluster/StandaloneSchedulerBackend.scala | 2 +-
.../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 2 +-
core/src/main/scala/org/apache/spark/ui/PagedTable.scala | 1 +
.../scala/org/apache/spark/HeartbeatReceiverSuite.scala | 2 +-
.../test/scala/org/apache/spark/SparkThrowableSuite.scala | 1 +
.../apache/spark/internal/plugin/PluginContainerSuite.scala | 2 +-
.../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 4 ++--
.../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 2 +-
.../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +-
.../org/apache/spark/sql/catalyst/catalog/interface.scala | 3 ++-
.../sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +-
.../sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 +-
.../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +-
.../org/apache/spark/sql/execution/command/tables.scala | 6 +++---
.../apache/spark/sql/execution/datasources/DataSource.scala | 2 +-
.../apache/spark/sql/execution/datasources/FileFormat.scala | 3 ++-
.../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 2 +-
.../spark/sql/execution/datasources/v2/CacheTableExec.scala | 2 +-
.../sql/execution/datasources/v2/DataSourceV2Utils.scala | 2 +-
.../sql/execution/datasources/v2/FileDataSourceV2.scala | 2 +-
.../sql/execution/datasources/v2/ShowCreateTableExec.scala | 4 ++--
.../sql/execution/datasources/v2/V2SessionCatalog.scala | 2 +-
.../sql/execution/streaming/state/RocksDBFileManager.scala | 2 +-
.../org/apache/spark/sql/execution/ui/ExecutionPage.scala | 4 ++--
.../org/apache/spark/sql/streaming/DataStreamReader.scala | 2 +-
.../org/apache/spark/sql/streaming/DataStreamWriter.scala | 2 +-
.../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 13 +++++++------
.../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 5 +++--
.../org/apache/spark/sql/hive/execution/HiveOptions.scala | 2 +-
.../apache/spark/sql/hive/HiveSchemaInferenceSuite.scala | 2 +-
.../scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 9 +++++----
.../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 9 +++++----
.../sql/hive/execution/command/ShowCreateTableSuite.scala | 2 +-
40 files changed, 65 insertions(+), 56 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 0ca39817279..026c4d56072 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -102,7 +102,7 @@ class KafkaContinuousStream(
}
val startOffsets = newPartitionOffsets ++
- oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+ oldStartPartitionOffsets.view.filterKeys(!deletedPartitions.contains(_))
knownPartitions = startOffsets.keySet
startOffsets.toSeq.map {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e98f08d7ec4..46503e017ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -917,6 +917,7 @@ private[deploy] class Master(
private def decommissionWorkersOnHosts(hostnames: Seq[String]): Integer = {
val hostnamesSet = hostnames.map(_.toLowerCase(Locale.ROOT)).toSet
val workersToRemove = addressToWorker
+ .view
.filterKeys(addr =>
hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)))
.values
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index a690937c202..68f08dd951e 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -422,7 +422,7 @@ private[spark] object RestSubmissionClient {
* Filter non-spark environment variables from any environment.
*/
private[rest] def filterSystemEnvironment(env: Map[String, String]):
Map[String, String] = {
- env.filterKeys { k =>
+ env.view.filterKeys { k =>
k.startsWith("SPARK_") && !EXCLUDED_SPARK_ENV_VARS.contains(k)
}.toMap
}
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 299148a912a..b074ac814a9 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -154,13 +154,13 @@ private[spark] class CoarseGrainedExecutorBackend(
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
- sys.env.filterKeys(_.startsWith(prefix))
+ sys.env.view.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT),
e._2)).toMap
}
def extractAttributes: Map[String, String] = {
val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
- sys.env.filterKeys(_.startsWith(prefix))
+ sys.env.view.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT),
e._2)).toMap
}
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 6d6197d58a1..fdaa68b6931 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -95,12 +95,12 @@ class ResourceProfile(
}
private[spark] def getCustomTaskResources(): Map[String,
TaskResourceRequest] = {
- taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
+ taskResources.view.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
}
protected[spark] def getCustomExecutorResources(): Map[String,
ExecutorResourceRequest] = {
executorResources.
- filterKeys(k =>
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
+ view.filterKeys(k =>
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
}
/*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 07a71ebed08..230e49db536 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -838,7 +838,7 @@ private[spark] class DAGScheduler(
if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
logError("No stages registered for job " + job.jobId)
} else {
- stageIdToStage.filterKeys(stageId =>
registeredStages.get.contains(stageId)).foreach {
+ stageIdToStage.view.filterKeys(stageId =>
registeredStages.get.contains(stageId)).foreach {
case (stageId, stage) =>
val jobSet = stage.jobIds
if (!jobSet.contains(job.jobId)) {
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c770e5c9950..274f867ce35 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -371,7 +371,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
- val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
+ val activeExecutors = executorDataMap.view.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) => buildWorkerOffer(id, executorData)
}.toIndexedSeq
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 9f71a0fe58c..f1be78ab93d 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -253,7 +253,7 @@ private[spark] class StandaloneSchedulerBackend(
override def getDriverLogUrls: Option[Map[String, String]] = {
val prefix = "SPARK_DRIVER_LOG_URL_"
- val driverLogUrls = sys.env.filterKeys(_.startsWith(prefix))
+ val driverLogUrls = sys.env.view.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT),
e._2)).toMap
if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None
}
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b21a2aa1c17..d539611271c 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -653,7 +653,7 @@ final class ShuffleBlockFetcherIterator(
hostLocalDirManager.getHostLocalDirs(host, port,
bmIds.map(_.executorId)) {
case Success(dirsByExecId) =>
fetchMultipleHostLocalBlocks(
- hostLocalBlocksWithMissingDirs.filterKeys(bmIds.contains).toMap,
+
hostLocalBlocksWithMissingDirs.view.filterKeys(bmIds.contains).toMap,
dirsByExecId,
cached = false)
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 99052628f6e..208b8189b4d 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -207,6 +207,7 @@ private[spark] trait PagedTable[T] {
.withKeyValueSeparator("=")
.split(search)
.asScala
+ .view
.filterKeys(_ != pageSizeFormField)
.filterKeys(_ != pageNumberFormField)
.mapValues(URLDecoder.decode(_, UTF_8.name()))
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index a8351322e01..95ef417239c 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -306,7 +306,7 @@ class HeartbeatReceiverSuite
// We may receive undesired SparkListenerExecutorAdded from
LocalSchedulerBackend,
// so exclude it from the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
- filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap
+ view.filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 4d011398c63..fb82714949b 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -152,6 +152,7 @@ class SparkThrowableSuite extends SparkFunSuite {
test("Message format invariants") {
val messageFormats = errorReader.errorInfoMap
+ .view
.filterKeys(!_.startsWith("_LEGACY_ERROR_"))
.filterKeys(!_.startsWith("INTERNAL_ERROR"))
.values.toSeq.flatMap { i => Seq(i.messageTemplate) }
diff --git
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
index 4bfa624fbd4..ef214bd50d9 100644
---
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -264,7 +264,7 @@ object NonLocalModeSparkPlugin {
resources: Map[String, ResourceInformation]): String = {
// try to keep this simple and only write the gpus addresses, if we add
more resources need to
// make more complex
- val resourcesString = resources.filterKeys(_.equals(GPU)).map {
+ val resourcesString = resources.view.filterKeys(_.equals(GPU)).map {
case (_, ri) =>
s"${ri.addresses.mkString(",")}"
}.mkString(",")
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index e6c2171058d..e678489e100 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -187,7 +187,7 @@ class ExecutorPodsAllocator(
// to the schedulerKnownNewlyCreatedExecs
val schedulerKnownExecs =
schedulerBackend.getExecutorIds().map(_.toLong).toSet
schedulerKnownNewlyCreatedExecs ++=
-
newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
+
newlyCreatedExecutors.view.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet
// For all executors we've created against the API but have not seen in a
snapshot
@@ -239,7 +239,7 @@ class ExecutorPodsAllocator(
_deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
}
- val notDeletedPods =
lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))
+ val notDeletedPods =
lastSnapshot.executorPods.view.filterKeys(!_deletedExecutorIds.contains(_))
// Map the pods into per ResourceProfile id so we can check per
ResourceProfile,
// add a fast path if not using other ResourceProfiles.
val rpIdToExecsAndPodState =
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index d4c24e8a39f..cdaf04173c2 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -205,7 +205,7 @@ private[yarn] class ExecutorRunnable(
val env = new HashMap[String, String]()
Client.populateClasspath(null, conf, sparkConf, env,
sparkConf.get(EXECUTOR_CLASS_PATH))
- System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
+ System.getenv().asScala.view.filterKeys(_.startsWith("SPARK"))
.foreach { case (k, v) => env(k) = v }
sparkConf.getExecutorEnv.foreach { case (key, value) =>
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 448353d3a5c..d9a5bd6240a 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -344,7 +344,7 @@ private[yarn] class YarnAllocator(
val gpuResource = sparkConf.get(YARN_GPU_DEVICE)
val fpgaResource = sparkConf.get(YARN_FPGA_DEVICE)
getYarnResourcesAndAmounts(sparkConf,
config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
- customSparkResources.filterKeys { r =>
+ customSparkResources.view.filterKeys { r =>
(r == gpuResource || r == fpgaResource)
}
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 26b38676b07..634afb47ea5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -407,7 +407,7 @@ case class CatalogTable(
def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
val map = new mutable.LinkedHashMap[String, String]()
val tableProperties =
-
SQLConf.get.redactOptions(properties.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap)
+
SQLConf.get.redactOptions(properties.view.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap)
.toSeq.sortBy(_._1)
.map(p => p._1 + "=" + p._2)
val partitionColumns =
partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
@@ -561,6 +561,7 @@ object CatalogTable {
createTime = 0L,
lastAccessTime = 0L,
properties = table.properties
+ .view
.filterKeys(!nondeterministicProps.contains(_))
.map(identity)
.toMap,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index cfed1761e33..6b54853a02c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -544,7 +544,7 @@ class CodegenContext extends Logging {
s"private $className $classInstance = new $className();"
}
- val declareNestedClasses = classFunctions.filterKeys(_ !=
outerClassName).map {
+ val declareNestedClasses = classFunctions.view.filterKeys(_ !=
outerClassName).map {
case (className, functions) =>
s"""
|private class $className {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 8f976a49a2b..fbc4e4109f1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -762,7 +762,7 @@ object View {
// as optimization configs but they are still needed during the view
resolution.
// TODO: remove this `retainedConfigs` after the `RelationConversions` is
moved to
// optimization phase.
- val retainedConfigs = activeConf.getAllConfs.filterKeys(key =>
+ val retainedConfigs = activeConf.getAllConfs.view.filterKeys(key =>
Seq(
"spark.sql.hive.convertMetastoreParquet",
"spark.sql.hive.convertMetastoreOrc",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 127fae14cb1..7169be78345 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -262,7 +262,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
{
val optionsWithPath = getOptionsWithPath(path)
- val finalOptions =
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+ val finalOptions =
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f67e23f21e0..5c748bcb084 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -674,7 +674,7 @@ case class DescribeTableCommand(
)
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", "", "")
- table.toLinkedHashMap.filterKeys(!excludedTableInfo.contains(_)).foreach {
+
table.toLinkedHashMap.view.filterKeys(!excludedTableInfo.contains(_)).foreach {
s => append(buffer, s._1, s._2, "")
}
}
@@ -955,7 +955,7 @@ case class ShowTablePropertiesCommand(
Seq(Row(p, propValue))
}
case None =>
- properties.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
+ properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
.toSeq.sortBy(_._1).map(p => Row(p._1, p._2))
}
}
@@ -1103,7 +1103,7 @@ trait ShowCreateTableCommandBase extends SQLConfHelper {
}
private def showViewProperties(metadata: CatalogTable, builder:
StringBuilder): Unit = {
- val viewProps =
metadata.properties.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
+ val viewProps =
metadata.properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
if (viewProps.nonEmpty) {
val props = viewProps.toSeq.sortBy(_._1).map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' =
'${escapeSingleQuotedString(value)}'"
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 68f17787c89..8bb6947488a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -796,7 +796,7 @@ object DataSource extends Logging {
*/
def buildStorageFormatFromOptions(options: Map[String, String]):
CatalogStorageFormat = {
val path = CaseInsensitiveMap(options).get("path")
- val optionsWithoutPath = options.filterKeys(_.toLowerCase(Locale.ROOT) !=
"path")
+ val optionsWithoutPath =
options.view.filterKeys(_.toLowerCase(Locale.ROOT) != "path")
CatalogStorageFormat.empty.copy(
locationUri = path.map(CatalogUtils.stringToURI), properties =
optionsWithoutPath.toMap)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index df16f8161d1..babe3e88d58 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -310,7 +310,8 @@ object FileFormat {
// fields whose values can be derived from a file status. In particular,
we don't have accurate
// file split information yet, nor do we have a way to provide custom
metadata column values.
val validFieldNames = Set(FILE_PATH, FILE_NAME, FILE_SIZE,
FILE_MODIFICATION_TIME)
- val extractors =
FileFormat.BASE_METADATA_EXTRACTORS.filterKeys(validFieldNames.contains).toMap
+ val extractors =
+
FileFormat.BASE_METADATA_EXTRACTORS.view.filterKeys(validFieldNames.contains).toMap
assert(fieldNames.forall(validFieldNames.contains))
val pf = PartitionedFile(
partitionValues = partitionValues,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 57651684070..daa282fda6a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -62,7 +62,7 @@ class JDBCOptions(
*/
val asConnectionProperties: Properties = {
val properties = new Properties()
- parameters.originalMap.filterKeys(key =>
!jdbcOptionNames(key.toLowerCase(Locale.ROOT)))
+ parameters.originalMap.view.filterKeys(key =>
!jdbcOptionNames(key.toLowerCase(Locale.ROOT)))
.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 1744df83033..b540ab6a4c9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -41,7 +41,7 @@ trait BaseCacheTableExec extends LeafV2CommandExec {
val storageLevel = CaseInsensitiveMap(options).get(storageLevelKey)
.map(s => StorageLevel.fromString(s.toUpperCase(Locale.ROOT)))
.getOrElse(conf.defaultCacheStorageLevel)
- val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) !=
storageLevelKey)
+ val withoutStorageLevel =
options.view.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey)
if (withoutStorageLevel.nonEmpty) {
logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index 574bc869de4..e4853006157 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -106,7 +106,7 @@ private[sql] object DataSourceV2Utils extends Logging {
val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*)
- val finalOptions =
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+ val finalOptions =
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val (table, catalog, ident) = provider match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 19c9b3d8bb3..03c6589f8dc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -58,7 +58,7 @@ trait FileDataSourceV2 extends TableProvider with
DataSourceRegister {
}
protected def getOptionsWithoutPaths(map: CaseInsensitiveStringMap):
CaseInsensitiveStringMap = {
- val withoutPath = map.asCaseSensitiveMap().asScala.filterKeys { k =>
+ val withoutPath = map.asCaseSensitiveMap().asScala.view.filterKeys { k =>
!k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths")
}
new CaseInsensitiveStringMap(withoutPath.toMap.asJava)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
index 8f9a925ed4b..facfa567472 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
@@ -50,7 +50,7 @@ case class ShowCreateTableExec(
showTableDataColumns(table, builder)
showTableUsing(table, builder)
- val tableOptions = table.properties.asScala
+ val tableOptions = table.properties.asScala.view
.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
}.toMap
@@ -132,7 +132,7 @@ case class ShowCreateTableExec(
builder: StringBuilder,
tableOptions: Map[String, String]): Unit = {
- val showProps = table.properties.asScala
+ val showProps = table.properties.asScala.view
.filterKeys(key => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
&& !key.startsWith(TableCatalog.OPTION_PREFIX)
&& !tableOptions.contains(key))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index a43f42a87d1..bd2e6597493 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -150,7 +150,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
private def toOptions(properties: Map[String, String]): Map[String, String]
= {
- properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
+ properties.view.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
}.toMap
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 3d0745c2fb3..6a62a6c52f5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -458,7 +458,7 @@ class RocksDBFileManager(
// Get the immutable files used in previous versions, as some of those
uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
- val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ <
version)
+ val prevFilesToSizes = versionToRocksDBFiles.asScala.view.filterKeys(_ <
version)
.values.flatten.map { f =>
f.localFileName -> f
}.toMap
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index d1aefdb3463..2a3028155a2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -85,9 +85,9 @@ class ExecutionPage(parent: SQLTab) extends
WebUIPage("execution") with Logging
summary ++
planVisualization(request, metrics, graph) ++
physicalPlanDescription(executionUIData.physicalPlanDescription) ++
-
modifiedConfigs(configs.filterKeys(!_.startsWith(pandasOnSparkConfPrefix)).toMap)
++
+
modifiedConfigs(configs.view.filterKeys(!_.startsWith(pandasOnSparkConfPrefix)).toMap)
++
modifiedPandasOnSparkConfigs(
- configs.filterKeys(_.startsWith(pandasOnSparkConfPrefix)).toMap)
+ configs.view.filterKeys(_.startsWith(pandasOnSparkConfPrefix)).toMap)
}.getOrElse {
<div>No information to display for query {executionId}</div>
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index e572cb26874..fc8f5a416ab 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -174,7 +174,7 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2]
=>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
- val finalOptions =
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+ val finalOptions =
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val table = DataSourceV2Utils.getTableFromProvider(provider,
dsOptions, userSpecifiedSchema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index c53ecc56bb6..036afa62b48 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -384,7 +384,7 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
val provider =
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = df.sparkSession.sessionState.conf)
- val finalOptions =
sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
+ val finalOptions =
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
// If the source accepts external table metadata, here we pass the
schema of input query
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 67292460bbc..5c340522f91 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -870,7 +870,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
}
val storageWithoutHiveGeneratedProperties =
storageWithLocation.copy(properties =
-
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
+
storageWithLocation.properties.view.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
val schemaFromTableProps =
@@ -885,7 +885,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
partitionColumnNames = partColumnNames,
bucketSpec = getBucketSpecFromTableProperties(table),
tracksPartitionsInCatalog = partitionProvider ==
Some(TABLE_PARTITION_PROVIDER_CATALOG),
- properties =
table.properties.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap)
+ properties =
table.properties.view.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap)
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -1160,14 +1160,15 @@ private[spark] class HiveExternalCatalog(conf:
SparkConf, hadoopConf: Configurat
properties: Map[String, String],
table: String): Option[CatalogStatistics] = {
- val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+ val statsProps =
properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX))
if (statsProps.isEmpty) {
None
} else {
val colStats = new mutable.HashMap[String, CatalogColumnStat]
- val colStatsProps =
properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
- case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
- }.toMap
+ val colStatsProps =
+
properties.view.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
+ case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
+ }.toMap
// Find all the column names by matching the KEY_VERSION properties for
them.
colStatsProps.keys.filter {
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index dbfb4d65bd1..08f69aecdd2 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -133,12 +133,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
// Consider table and storage properties. For properties existing in both
sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
- val options =
relation.tableMeta.properties.filterKeys(isParquetProperty).toMap ++
+ val options =
relation.tableMeta.properties.view.filterKeys(isParquetProperty).toMap ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options,
classOf[ParquetFileFormat], "parquet", isWrite)
} else {
- val options =
relation.tableMeta.properties.filterKeys(isOrcProperty).toMap ++
+ val options =
relation.tableMeta.properties.view.filterKeys(isOrcProperty).toMap ++
relation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
@@ -377,6 +377,7 @@ private[hive] object HiveMetastoreCatalog {
// Find any nullable fields in metastore schema that are missing from the
inferred schema.
val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase ->
f).toMap
val missingNullables = metastoreFields
+ .view
.filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
.values
.filter(_.nullable)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
index 044a515fbdc..44f0e43d11e 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
@@ -85,7 +85,7 @@ class HiveOptions(@transient private val parameters:
CaseInsensitiveMap[String])
s"line delimiter, but given: $lineDelim.")
}
- def serdeProperties: Map[String, String] = parameters.filterKeys {
+ def serdeProperties: Map[String, String] = parameters.view.filterKeys {
k => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT))
}.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }.toMap
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
index ebf02287489..400befb91d9 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -124,7 +124,7 @@ class HiveSchemaInferenceSuite
// properties out).
assert(!externalCatalog.getTable(DATABASE,
TEST_TABLE_NAME).schemaPreservesCase)
val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
-
assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)
+
assert(rawTable.properties.view.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)
// Add partition records (if specified)
if (!partitionCols.isEmpty) {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 507c482525c..808defa359c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -892,7 +892,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase
with TestHiveSingleto
*/
private def getStatsProperties(tableName: String): Map[String, String] = {
val hTable =
hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName)
- hTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap
+ hTable.properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap
}
test("change stats after insert command for hive table") {
@@ -1130,7 +1130,8 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
def checkColStatsProps(expected: Map[String, String]): Unit = {
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " +
stats.keys.mkString(", "))
val table = hiveClient.getTable("default", tableName)
- val props =
table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats")).toMap
+ val props =
+
table.properties.view.filterKeys(_.startsWith("spark.sql.statistics.colStats")).toMap
assert(props == expected)
}
@@ -1199,11 +1200,11 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS cint,
ctimestamp")
val table = hiveClient.getTable("default", tableName)
val intHistogramProps = table.properties
-
.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram"))
+
.view.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram"))
assert(intHistogramProps.size == 1)
val tsHistogramProps = table.properties
-
.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram"))
+
.view.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram"))
assert(tsHistogramProps.size == 1)
// Validate histogram after deserialization.
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index b6971204b1c..62b2d849c68 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -132,7 +132,7 @@ class HiveDDLSuite
createTime = 0L,
lastAccessTime = 0L,
owner = "",
- properties =
table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
+ properties =
table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap,
// View texts are checked separately
viewText = None
)
@@ -1088,7 +1088,7 @@ class HiveDDLSuite
expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ")
val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width"
-> "4"))
assert(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was
already set")
- assert(oldPart.storage.properties.filterKeys(expectedSerdeProps.contains)
!=
+
assert(oldPart.storage.properties.view.filterKeys(expectedSerdeProps.contains)
!=
expectedSerdeProps, "bad test: serde properties were already set")
sql(s"""ALTER TABLE boxes PARTITION (width=4)
| SET SERDE '$expectedSerde'
@@ -1096,7 +1096,7 @@ class HiveDDLSuite
|""".stripMargin)
val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width"
-> "4"))
assert(newPart.storage.serde == Some(expectedSerde))
-
assert(newPart.storage.properties.filterKeys(expectedSerdeProps.contains).toMap
==
+
assert(newPart.storage.properties.view.filterKeys(expectedSerdeProps.contains).toMap
==
expectedSerdeProps)
}
@@ -1697,7 +1697,8 @@ class HiveDDLSuite
"maxFileSize",
"minFileSize"
)
-
assert(targetTable.properties.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
+ assert(
+
targetTable.properties.view.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
"the table properties of source tables should not be copied in the
created table")
provider match {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
index 5f8f250f8e9..c7e19e943e6 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
@@ -246,7 +246,7 @@ class ShowCreateTableSuite extends
v1.ShowCreateTableSuiteBase with CommandSuite
table.copy(
createTime = 0L,
lastAccessTime = 0L,
- properties =
table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
+ properties =
table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap,
stats = None,
ignoredProperties = Map.empty,
storage = table.storage.copy(properties = Map.empty),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]