This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5db7beb59a67 [SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove
redundant type conversion
5db7beb59a67 is described below
commit 5db7beb59a673f05e8f39aa9653cb0497a6c97cf
Author: yangjie01 <[email protected]>
AuthorDate: Sun Dec 24 14:44:14 2023 -0800
[SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove redundant type
conversion
### What changes were proposed in this pull request?
This pr aims to clean up redundant type conversion in Spark production code.
### Why are the changes needed?
Code clean up.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44412 from LuciferYang/cleanup-redundant-conversion.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 2 +-
.../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala | 4 ++--
.../spark/sql/connect/execution/ExecuteResponseObserver.scala | 2 +-
.../spark/sql/connect/service/SparkConnectExecutionManager.scala | 4 ++--
.../org/apache/spark/examples/streaming/KinesisWordCountASL.scala | 4 ++--
.../scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala | 2 +-
sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala | 4 ++--
.../scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala | 2 +-
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 +-
.../org/apache/spark/sql/execution/adaptive/QueryStageExec.scala | 2 +-
.../spark/sql/execution/command/AnalyzePartitionCommand.scala | 2 +-
.../apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala | 2 +-
.../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 6 +++---
18 files changed, 24 insertions(+), 24 deletions(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 29b9fdf9dfb9..9e10fac8bb55 100644
---
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -327,7 +327,7 @@ private[sql] class AvroDeserializer(
if (nonNullTypes.length == 1) {
newWriter(nonNullTypes.head, catalystType, avroPath, catalystPath)
} else {
- nonNullTypes.map(_.getType).toSeq match {
+ nonNullTypes.map(_.getType) match {
case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType ==
LongType =>
(updater, ordinal, value) =>
value match {
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 115cedfe1128..c9ceef969e29 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -158,7 +158,7 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
Long.MaxValue
} else {
val confSize =
-
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong
+
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION)
if (confSize > 0) System.currentTimeMillis() + confSize else
Long.MaxValue
}
@@ -167,7 +167,7 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
Long.MaxValue
} else {
val confSize =
-
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE).toLong
+
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE)
if (confSize > 0) confSize else Long.MaxValue
}
var sentResponsesSize: Long = 0
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index b5844486b73a..a7877503f461 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -102,7 +102,7 @@ private[connect] class ExecuteResponseObserver[T <:
Message](val executeHolder:
* value greater than 0 will buffer the response from getResponse.
*/
private val retryBufferSize = if (executeHolder.reattachable) {
-
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE).toLong
+
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
} else {
0
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index d8d9cee3dad4..c90f53ac07df 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -188,13 +188,13 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
scheduledExecutor match {
case Some(_) => // Already running.
case None =>
- val interval =
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong
+ val interval =
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL)
logInfo(s"Starting thread for cleanup of abandoned executions every
$interval ms")
scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
scheduledExecutor.get.scheduleAtFixedRate(
() => {
try {
- val timeout =
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong
+ val timeout =
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT)
periodicMaintenance(timeout)
} catch {
case NonFatal(ex) => logWarning("Unexpected exception in
periodic task", ex)
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index 7d12af3256f1..4835e9de086c 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -230,9 +230,9 @@ object KinesisWordProducerASL {
// Iterate and put records onto the stream per the given recordPerSec and
wordsPerRecord
for (i <- 1 to 10) {
// Generate recordsPerSec records to put onto the stream
- val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
+ val records = (1 to recordsPerSecond).foreach { recordNum =>
// Randomly generate wordsPerRecord number of words
- val data = (1 to wordsPerRecord.toInt).map(x => {
+ val data = (1 to wordsPerRecord).map(x => {
// Get a random index to a word
val randomWordIdx = Random.nextInt(randomWords.size)
val randomWord = randomWords(randomWordIdx)
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index d0eee9c83c20..406c19be9bff 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -130,7 +130,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
val producer = getProducer(aggregate)
val shardIdToSeqNumbers = producer.sendData(streamName, testData)
logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
- shardIdToSeqNumbers.toMap
+ shardIdToSeqNumbers
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 7a9c0263631f..2d72b4dd6bf2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -104,7 +104,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends
Serializable with Loggi
}
val left = num - results.size
- val p = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts).toInt)
+ val p = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts))
val buf = new Array[Array[T]](p.size)
self.context.setCallSite(callSite)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index fe10e140f82d..9518433a7f69 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1486,7 +1486,7 @@ abstract class RDD[T: ClassTag](
}
}
- val p = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts).toInt)
+ val p = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts))
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray,
p)
res.foreach(buf ++= _.take(num - buf.size))
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index c1c36d7a9f04..d50b8f935d56 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -371,7 +371,7 @@ private[spark] class AppStatusStore(
Double.NaN
}
}
- }.toIndexedSeq
+ }
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
index 5e2b8943ed84..9e085c7078e6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
@@ -37,7 +37,7 @@ object MLUtil {
val destFSPath = new FSPath(destPath)
val fs = destFSPath.getFileSystem(hadoopConf)
- fs.copyFromLocalFile(false, true, new FSPath(localPath.toString),
destFSPath)
+ fs.copyFromLocalFile(false, true, new FSPath(localPath), destFSPath)
}
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index dcb80221b0e0..17be8cfa12b5 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -139,7 +139,7 @@ object Metadata {
case (key, JInt(value)) =>
builder.putLong(key, value.toLong)
case (key, JLong(value)) =>
- builder.putLong(key, value.toLong)
+ builder.putLong(key, value)
case (key, JDouble(value)) =>
builder.putDouble(key, value)
case (key, JBool(value)) =>
@@ -157,7 +157,7 @@ object Metadata {
case _: JInt =>
builder.putLongArray(key,
value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray)
case _: JLong =>
- builder.putLongArray(key,
value.asInstanceOf[List[JLong]].map(_.num.toLong).toArray)
+ builder.putLongArray(key,
value.asInstanceOf[List[JLong]].map(_.num).toArray)
case _: JDouble =>
builder.putDoubleArray(key,
value.asInstanceOf[List[JDouble]].map(_.num).toArray)
case _: JBool =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 3fd1fe04aed6..bb2b7e7ae066 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -105,7 +105,7 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
functionName: String, argumentName: String, candidates: Seq[String]):
Throwable = {
import
org.apache.spark.sql.catalyst.util.StringUtils.orderSuggestedIdentifiersBySimilarity
- val inputs = candidates.map(candidate => Seq(candidate)).toSeq
+ val inputs = candidates.map(candidate => Seq(candidate))
val recommendations = orderSuggestedIdentifiersBySimilarity(argumentName,
inputs)
.take(3)
new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a3dc976647be..31e1495db7e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1435,7 +1435,7 @@ class Dataset[T] private[sql](
case s: Symbol => Column(s.name).expr
case e: Expression => e
case literal => Literal(literal)
- }.toSeq
+ }
UnresolvedHint(name, exprs, logicalPlan)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c65d1931dd1b..7bc770a0c9e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -518,7 +518,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
}
}
- val parts = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts).toInt)
+ val parts = partsScanned.until(math.min(partsScanned + numPartsToTry,
totalParts))
val partsToScan = if (takeFromEnd) {
// Reverse partitions to scan. So, if parts was [1, 2, 3] in 200
partitions (0 to 199),
// it becomes [198, 197, 196].
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 89e9de8b0843..88954d6f822d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -285,7 +285,7 @@ case class TableCacheQueryStageExec(
sparkContext.submitJob(
rdd,
(_: Iterator[CachedBatch]) => (),
- (0 until rdd.getNumPartitions).toSeq,
+ (0 until rdd.getNumPartitions),
(_: Int, _: Unit) => (),
()
)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 98a851f19f05..4efd94e442e4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -65,7 +65,7 @@ case class AnalyzePartitionCommand(
if (filteredSpec.isEmpty) {
None
} else {
- Some(filteredSpec.toMap)
+ Some(filteredSpec)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index c4e12d5c4ae0..2cf299f87c89 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -83,7 +83,7 @@ trait OrcFiltersBase {
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.transform((_, v) => v.head._2)
- CaseInsensitiveMap(dedupPrimitiveFields.toMap)
+ CaseInsensitiveMap(dedupPrimitiveFields)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index c33a7c472842..101a9e6b9199 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -547,13 +547,13 @@ class RocksDB(
readerMemUsage + memTableMemUsage + blockCacheUsage,
pinnedBlocksMemUsage,
totalSSTFilesBytes,
- nativeOpsLatencyMicros.toMap,
+ nativeOpsLatencyMicros,
commitLatencyMs,
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
- nativeOpsMetrics = nativeOpsMetrics.toMap)
+ nativeOpsMetrics = nativeOpsMetrics)
}
/**
@@ -861,7 +861,7 @@ object RocksDBConf {
}
def getStringConf(conf: ConfEntry): String = {
- Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString
} getOrElse {
+ Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default) }
getOrElse {
throw new IllegalArgumentException(
s"Invalid value for '${conf.fullName}', must be a string"
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]