This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e1a2255f99be 
[SPARK-45699][BUILD][CORE][SQL][SS][CONNECT][MLLIB][ML][DSTREAM][GRAPHX][K8S][UI]
 Fixing all compilation warnings related to widening conversions
e1a2255f99be is described below

commit e1a2255f99be88e776295f30f995b339c3e4b5af
Author: hannahkamundson <[email protected]>
AuthorDate: Mon Nov 27 10:38:22 2023 +0800

    
[SPARK-45699][BUILD][CORE][SQL][SS][CONNECT][MLLIB][ML][DSTREAM][GRAPHX][K8S][UI]
 Fixing all compilation warnings related to widening conversions
    
    ### What changes were proposed in this pull request?
    
    1. Change the silencing of the widening conversion compilation warnings in 
the parent `pom.xml` and `SparkBuild` to throw an error
    2. All widening conversion compilation warnings were removed. This almost 
exclusively involved adding `.toDouble` to longs. However, it also involved 
some `.toFloat` on ints and longs.
    
    ### Why are the changes needed?
    It allows us to upgrade to Scala 2.13 without adding a bunch of compilation 
issues. This is removing the following compilation error
    ```shell
    [error] 
/Users/yangjie01/SourceCode/git/spark-mine-sbt/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:1207:60:
 Widening conversion from Long to Double is deprecated because it loses 
precision. Write `.toDouble` instead. [quickfixable]
    [error] Applicable -Wconf / nowarn filters for this fatal warning: 
msg=<part of the message>, cat=deprecation, 
site=org.apache.spark.scheduler.TaskSetManager.checkSpeculatableTasks
    [error]       foundTasks = checkAndSubmitSpeculatableTasks(timeMs, 
threshold, customizedThreshold = true)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No tests were added.
    For every profile (including the base profile), I ran `mvn clean compile 
test-compile`. I then `grep`ed any lines that had the word `Wide` in it. I 
determined I was done when no output remained.
    
    Here is a script of what was run:
    ```shell
    mvn clean compile test-compile |& tee output.txt
    cat output.txt | grep .*Wide.* |& tee output-widening.txt
    mvn clean compile test-compile -Pspark-ganglia-lgpl |& tee 
output-spark-ganglia-lgpl.txt
    cat output-spark-ganglia-lgpl.txt | grep .*Wide.* |& tee 
output-spark-ganglia-lgpl-widening.txt
    mvn clean compile test-compile -Pkinesis-asl |& tee output-kinesis-asl.txt
    cat output-kinesis-asl.txt | grep .*Wide.* |& tee 
output-kinesis-asl-widening.txt
    mvn clean compile test-compile -Pdocker-integration-tests |& tee 
output-docker-integration-tests.txt
    cat output-docker-integration-tests.txt | grep .*Wide.* |& tee 
output-docker-integration-tests-widening.txt
    mvn clean compile test-compile -Pyarn \& tee output-yarn.txt
    cat output-yarn.txt | grep .*Wide.* |& tee output-yarn-widening.txt
    mvn clean compile test-compile -Pkubernetes |& tee output-kubernetes.txt
    cat output-kubernetes.txt | grep .*Wide.* |& tee 
output-kubernetes-widening.txt
    mvn clean compile test-compile -Pkubernetes-integration-tests |& tee 
output-kubernetes-integration-tests.txt
    cat output-integration-tests.txt | grep .*Wide.* |& tee 
output-integration-tests-widening.txt
    mvn clean compile test-compile -Phive-thriftserver |& tee 
output-hive-thriftserver.txt
    cat output-thriftserver.txt | grep .*Wide.* |& tee 
output-thriftserver-widening.txt
    mvn clean compile test-compile -Phadoop-cloud |& tee output-hadoop-cloud.txt
    cat output-hadoop-cloud.txt | grep .*Wide.* |& tee 
output-hadoop-cloud-widening.txt
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43890 from hannahkamundson/SPARK-45699.
    
    Authored-by: hannahkamundson <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../types/UTF8StringPropertyCheckSuite.scala       |  4 +-
 .../connect/client/arrow/ArrowVectorReader.scala   |  6 +--
 .../kafka010/DirectKafkaInputDStream.scala         |  2 +-
 .../apache/spark/streaming/kafka010/KafkaRDD.scala |  2 +-
 .../kafka010/DirectKafkaStreamSuite.scala          |  2 +-
 .../spark/input/FixedLengthBinaryInputFormat.scala |  2 +-
 .../apache/spark/metrics/sink/StatsdReporter.scala |  4 +-
 .../org/apache/spark/partial/CountEvaluator.scala  |  5 +-
 .../spark/partial/GroupedCountEvaluator.scala      |  4 +-
 .../org/apache/spark/resource/ResourceUtils.scala  |  2 +-
 .../org/apache/spark/scheduler/MapStatus.scala     |  8 ++--
 .../apache/spark/scheduler/TaskSetManager.scala    |  7 +--
 .../main/scala/org/apache/spark/util/Clock.scala   |  2 +-
 .../util/random/StratifiedSamplingUtils.scala      |  4 +-
 .../org/apache/spark/benchmark/Benchmark.scala     |  2 +-
 .../spark/deploy/history/EventLogTestHelper.scala  |  2 +-
 .../apache/spark/status/AppStatusStoreSuite.scala  | 54 ++++++++++++----------
 .../org/apache/spark/graphx/lib/SVDPlusPlus.scala  |  2 +-
 .../apache/spark/graphx/lib/PageRankSuite.scala    |  2 +-
 .../regression/GeneralizedLinearRegression.scala   |  2 +-
 .../scala/org/apache/spark/ml/stat/ANOVATest.scala |  2 +-
 .../org/apache/spark/ml/stat/FValueTest.scala      |  2 +-
 .../spark/mllib/clustering/LDAOptimizer.scala      |  2 +-
 .../spark/mllib/clustering/StreamingKMeans.scala   |  2 +-
 .../apache/spark/mllib/fpm/AssociationRules.scala  |  4 +-
 .../spark/mllib/linalg/distributed/RowMatrix.scala |  5 +-
 .../stat/correlation/SpearmanCorrelation.scala     |  2 +-
 .../apache/spark/mllib/stat/test/ChiSqTest.scala   |  2 +-
 .../mllib/stat/test/StreamingTestMethod.scala      |  2 +-
 pom.xml                                            |  2 +-
 project/SparkBuild.scala                           |  2 +-
 .../scheduler/cluster/k8s/ExecutorRollPlugin.scala | 17 +++----
 .../plans/logical/basicLogicalOperators.scala      |  3 +-
 .../logical/statsEstimation/EstimationUtils.scala  |  4 +-
 .../sql/catalyst/util/QuantileSummaries.scala      |  2 +-
 .../ui/StreamingQueryStatisticsPage.scala          | 16 +++----
 .../spark/sql/StatisticsCollectionTestBase.scala   |  8 ++--
 .../compression/PassThroughEncodingSuite.scala     |  2 +-
 .../sql/streaming/EventTimeWatermarkSuite.scala    |  5 +-
 .../spark/streaming/receiver/RateLimiter.scala     |  4 +-
 .../apache/spark/streaming/ui/StreamingPage.scala  | 14 +++---
 .../org/apache/spark/streaming/ui/UIUtils.scala    |  8 ++--
 .../scheduler/ExecutorAllocationManagerSuite.scala |  4 +-
 .../streaming/scheduler/RateControllerSuite.scala  |  2 +-
 44 files changed, 124 insertions(+), 110 deletions(-)

diff --git 
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
 
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
index ab488e18ba3f..75c56451592e 100644
--- 
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
+++ 
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
@@ -80,7 +80,9 @@ class UTF8StringPropertyCheckSuite extends AnyFunSuite with 
ScalaCheckDrivenProp
 
   test("compare") {
     forAll { (s1: String, s2: String) =>
-      assert(Math.signum(toUTF8(s1).compareTo(toUTF8(s2))) === 
Math.signum(s1.compareTo(s2)))
+      assert(Math.signum {
+        toUTF8(s1).compareTo(toUTF8(s2)).toFloat
+      } === Math.signum(s1.compareTo(s2).toFloat))
     }
   }
 
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
index 488208574809..53d8d46e6268 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
@@ -134,7 +134,7 @@ private[arrow] class SmallIntVectorReader(v: SmallIntVector)
 private[arrow] class IntVectorReader(v: IntVector) extends 
TypedArrowVectorReader[IntVector](v) {
   override def getInt(i: Int): Int = vector.get(i)
   override def getLong(i: Int): Long = getInt(i)
-  override def getFloat(i: Int): Float = getInt(i)
+  override def getFloat(i: Int): Float = getInt(i).toFloat
   override def getDouble(i: Int): Double = getInt(i)
   override def getString(i: Int): String = String.valueOf(getInt(i))
   override def getJavaDecimal(i: Int): JBigDecimal = 
JBigDecimal.valueOf(getInt(i))
@@ -143,8 +143,8 @@ private[arrow] class IntVectorReader(v: IntVector) extends 
TypedArrowVectorReade
 private[arrow] class BigIntVectorReader(v: BigIntVector)
     extends TypedArrowVectorReader[BigIntVector](v) {
   override def getLong(i: Int): Long = vector.get(i)
-  override def getFloat(i: Int): Float = getLong(i)
-  override def getDouble(i: Int): Double = getLong(i)
+  override def getFloat(i: Int): Float = getLong(i).toFloat
+  override def getDouble(i: Int): Double = getLong(i).toDouble
   override def getString(i: Int): String = String.valueOf(getLong(i))
   override def getJavaDecimal(i: Int): JBigDecimal = 
JBigDecimal.valueOf(getLong(i))
   override def getTimestamp(i: Int): Timestamp = toJavaTimestamp(getLong(i) * 
MICROS_PER_SECOND)
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index f5967a74ad33..c412486ce197 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -146,7 +146,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
           val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
           val backpressureRate = lag / totalLag.toDouble * rate
           tp -> (if (maxRateLimitPerPartition > 0) {
-            Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
+            Math.min(backpressureRate, maxRateLimitPerPartition.toDouble)} 
else backpressureRate)
         }
       case None => offsets.map { case (tp, offset) => tp -> 
ppc.maxRatePerPartition(tp).toDouble }
     }
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 286b073125ff..6c57091bc3c4 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -98,7 +98,7 @@ private[spark] class KafkaRDD[K, V](
     if (compacted) {
       super.countApprox(timeout, confidence)
     } else {
-      val c = count()
+      val c = count().toDouble
       new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
     }
 
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index faf114108fac..28f090625830 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -805,7 +805,7 @@ private[streaming] class ConstantEstimator(@volatile 
private var rate: Long)
       time: Long,
       elements: Long,
       processingDelay: Long,
-      schedulingDelay: Long): Option[Double] = Some(rate)
+      schedulingDelay: Long): Option[Double] = Some(rate.toDouble)
 }
 
 private[streaming] class ConstantRateController(id: Int, estimator: 
RateEstimator, rate: Long)
diff --git 
a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala 
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 978afaffab30..4897cf694ae8 100644
--- 
a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++ 
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -74,7 +74,7 @@ private[spark] class FixedLengthBinaryInputFormat
     if (defaultSize < recordLength) {
       recordLength.toLong
     } else {
-      (Math.floor(defaultSize / recordLength) * recordLength).toLong
+      defaultSize / recordLength * recordLength
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
index 877f04b1adc0..189d390d3799 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
@@ -124,9 +124,9 @@ private[spark] class StatsdReporter(
 
   private def reportTimer(name: String, timer: Timer)(implicit socket: 
DatagramSocket): Unit = {
     val snapshot = timer.getSnapshot
-    send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), 
TIMER)
+    send(fullName(name, "max"), 
format(convertDuration(snapshot.getMax.toDouble)), TIMER)
     send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), 
TIMER)
-    send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), 
TIMER)
+    send(fullName(name, "min"), 
format(convertDuration(snapshot.getMin.toDouble)), TIMER)
     send(fullName(name, "stddev"), 
format(convertDuration(snapshot.getStdDev)), TIMER)
     send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), 
TIMER)
     send(fullName(name, "p75"), 
format(convertDuration(snapshot.get75thPercentile)), TIMER)
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
index cbee13687101..a974ca2f1a05 100644
--- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
@@ -35,7 +35,7 @@ private[spark] class CountEvaluator(totalOutputs: Int, 
confidence: Double)
 
   override def currentResult(): BoundedDouble = {
     if (outputsMerged == totalOutputs) {
-      new BoundedDouble(sum, 1.0, sum, sum)
+      new BoundedDouble(sum.toDouble, 1.0, sum.toDouble, sum.toDouble)
     } else if (outputsMerged == 0 || sum == 0) {
       new BoundedDouble(0, 0.0, 0.0, Double.PositiveInfinity)
     } else {
@@ -57,7 +57,8 @@ private[partial] object CountEvaluator {
     val low = dist.inverseCumulativeProbability((1 - confidence) / 2)
     val high = dist.inverseCumulativeProbability((1 + confidence) / 2)
     // Add 'sum' to each because distribution is just of remaining count, not 
observed
-    new BoundedDouble(sum + dist.getNumericalMean, confidence, sum + low, sum 
+ high)
+    new BoundedDouble(
+      sum + dist.getNumericalMean, confidence, (sum + low).toDouble, (sum + 
high).toDouble)
   }
 
 
diff --git 
a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index d2b4187df5d5..7cd60815fadb 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -41,7 +41,9 @@ private[spark] class GroupedCountEvaluator[T : 
ClassTag](totalOutputs: Int, conf
 
   override def currentResult(): Map[T, BoundedDouble] = {
     if (outputsMerged == totalOutputs) {
-      sums.map { case (key, sum) => (key, new BoundedDouble(sum, 1.0, sum, 
sum)) }.toMap
+      sums.map { case (key, sum) =>
+        (key, new BoundedDouble(sum.toDouble, 1.0, sum.toDouble, sum.toDouble))
+      }.toMap
     } else if (outputsMerged == 0) {
       new HashMap[T, BoundedDouble]
     } else {
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 00c655f4a4f4..fe08e8337f76 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -476,7 +476,7 @@ private[spark] object ResourceUtils extends Logging {
       if (maxTaskPerExec < (execAmount * numParts / taskAmount)) {
         val origTaskAmount = treq.amount
         val taskReqStr = s"${origTaskAmount}/${numParts}"
-        val resourceNumSlots = Math.floor(execAmount * numParts / 
taskAmount).toInt
+        val resourceNumSlots = (execAmount * numParts / taskAmount).toInt
         val message = s"The configuration of resource: ${treq.resourceName} " +
           s"(exec = ${execAmount}, task = ${taskReqStr}, " +
           s"runnable tasks = ${resourceNumSlots}) will " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d10cf55ed0d1..113521453ad7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -95,7 +95,7 @@ private[spark] object MapStatus {
     } else if (size <= 1L) {
       1
     } else {
-      math.min(255, math.ceil(math.log(size) / 
math.log(LOG_BASE)).toInt).toByte
+      math.min(255, math.ceil(math.log(size.toDouble) / 
math.log(LOG_BASE)).toInt).toByte
     }
   }
 
@@ -276,12 +276,12 @@ private[spark] object HighlyCompressedMapStatus {
         val skewSizeThreshold =
           Math.max(
             medianSize * accurateBlockSkewedFactor,
-            sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber)
+            sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber).toDouble
           )
-        Math.min(shuffleAccurateBlockThreshold, skewSizeThreshold)
+        Math.min(shuffleAccurateBlockThreshold.toDouble, skewSizeThreshold)
       } else {
         // Disable skew detection if accurateBlockSkewedFactor <= 0
-        shuffleAccurateBlockThreshold
+        shuffleAccurateBlockThreshold.toDouble
       }
 
     val hugeBlockSizes = mutable.Map.empty[Int, Byte]
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6157a3e46c87..d17e6735c4ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -809,7 +809,7 @@ private[spark] class TaskSetManager(
 
     info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
     if (speculationEnabled) {
-      successfulTaskDurations.insert(info.duration)
+      successfulTaskDurations.insert(info.duration.toDouble)
       taskProcessRateCalculator.foreach(_.updateAvgTaskProcessRate(tid, 
result))
     }
     removeRunningTask(tid)
@@ -1196,7 +1196,7 @@ private[spark] class TaskSetManager(
     val timeMs = clock.getTimeMillis()
     if (numSuccessfulTasks >= minFinishedForSpeculation) {
       val medianDuration = successfulTaskDurations.percentile()
-      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
+      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation.toDouble)
       // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
@@ -1204,7 +1204,8 @@ private[spark] class TaskSetManager(
     } else if (isSpeculationThresholdSpecified && 
speculationTasksLessEqToSlots) {
       val threshold = speculationTaskDurationThresOpt.get
       logDebug(s"Tasks taking longer time than provided speculation threshold: 
$threshold")
-      foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold, 
customizedThreshold = true)
+      foundTasks = checkAndSubmitSpeculatableTasks(
+        timeMs, threshold.toDouble, customizedThreshold = true)
     }
     // avoid more warning logs.
     if (foundTasks) {
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala 
b/core/src/main/scala/org/apache/spark/util/Clock.scala
index 226f15d3d38c..e0cb3f4188e6 100644
--- a/core/src/main/scala/org/apache/spark/util/Clock.scala
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -85,7 +85,7 @@ private[spark] class SystemClock extends Clock {
       return currentTime
     }
 
-    val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
+    val pollTime = math.max(waitTime / 10.0, minPollTime.toDouble).toLong
 
     while (true) {
       currentTime = System.currentTimeMillis()
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
 
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
index f08cf44e4e12..08e2ea01f623 100644
--- 
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
@@ -98,8 +98,8 @@ private[spark] object StratifiedSamplingUtils extends Logging 
{
         if (acceptResult.areBoundsEmpty) {
           val n = counts.get(key)
           val sampleSize = math.ceil(n * fraction).toLong
-          val lmbd1 = PoissonBounds.getLowerBound(sampleSize)
-          val lmbd2 = PoissonBounds.getUpperBound(sampleSize)
+          val lmbd1 = PoissonBounds.getLowerBound(sampleSize.toDouble)
+          val lmbd2 = PoissonBounds.getUpperBound(sampleSize.toDouble)
           acceptResult.acceptBound = lmbd1 / n
           acceptResult.waitListBound = (lmbd2 - lmbd1) / n
         }
diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala 
b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
index 0b33e2a9426c..e7315d6119be 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
@@ -163,7 +163,7 @@ private[spark] class Benchmark(
     // scalastyle:on
     assert(runTimes.nonEmpty)
     val best = runTimes.min
-    val avg = runTimes.sum / runTimes.size
+    val avg = runTimes.sum.toDouble / runTimes.size
     val stdev = if (runTimes.size > 1) {
       math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / 
(runTimes.size - 1))
     } else 0
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
index ac89f60955ee..0161917f8853 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
@@ -56,7 +56,7 @@ object EventLogTestHelper {
       eventStr: String,
       desiredSize: Long): Seq[String] = {
     val stringLen = eventStr.getBytes(StandardCharsets.UTF_8).length
-    val repeatCount = Math.floor(desiredSize / stringLen).toInt
+    val repeatCount = (desiredSize / stringLen).toInt
     (0 until repeatCount).map { _ =>
       writer.writeEvent(eventStr, flushLogger = true)
       eventStr
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index ccf6c9184cc9..f2b795764b7e 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -170,40 +170,44 @@ class AppStatusStoreSuite extends SparkFunSuite {
           assert(actualQuantiles === expectedQuantiles)
         }
 
-        assertQuantiles(_.executorDeserializeTime, 
summary.executorDeserializeTime)
-        assertQuantiles(_.executorDeserializeCpuTime, 
summary.executorDeserializeCpuTime)
-        assertQuantiles(_.executorRunTime, summary.executorRunTime)
-        assertQuantiles(_.executorRunTime, summary.executorRunTime)
-        assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
-        assertQuantiles(_.resultSize, summary.resultSize)
-        assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
-        assertQuantiles(_.resultSerializationTime, 
summary.resultSerializationTime)
-        assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
-        assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
-        assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
-        assertQuantiles(_.inputMetrics.bytesRead, 
summary.inputMetrics.bytesRead)
-        assertQuantiles(_.inputMetrics.recordsRead, 
summary.inputMetrics.recordsRead)
-        assertQuantiles(_.outputMetrics.bytesWritten, 
summary.outputMetrics.bytesWritten)
-        assertQuantiles(_.outputMetrics.recordsWritten, 
summary.outputMetrics.recordsWritten)
-        assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
+        assertQuantiles(_.executorDeserializeTime.toDouble, 
summary.executorDeserializeTime)
+        assertQuantiles(_.executorDeserializeCpuTime.toDouble, 
summary.executorDeserializeCpuTime)
+        assertQuantiles(_.executorRunTime.toDouble, summary.executorRunTime)
+        assertQuantiles(_.executorRunTime.toDouble, summary.executorRunTime)
+        assertQuantiles(_.executorCpuTime.toDouble, summary.executorCpuTime)
+        assertQuantiles(_.resultSize.toDouble, summary.resultSize)
+        assertQuantiles(_.jvmGCTime.toDouble, summary.jvmGcTime)
+        assertQuantiles(_.resultSerializationTime.toDouble, 
summary.resultSerializationTime)
+        assertQuantiles(_.memoryBytesSpilled.toDouble, 
summary.memoryBytesSpilled)
+        assertQuantiles(_.diskBytesSpilled.toDouble, summary.diskBytesSpilled)
+        assertQuantiles(_.peakExecutionMemory.toDouble, 
summary.peakExecutionMemory)
+        assertQuantiles(_.inputMetrics.bytesRead.toDouble, 
summary.inputMetrics.bytesRead)
+        assertQuantiles(_.inputMetrics.recordsRead.toDouble, 
summary.inputMetrics.recordsRead)
+        assertQuantiles(_.outputMetrics.bytesWritten.toDouble, 
summary.outputMetrics.bytesWritten)
+        assertQuantiles(_.outputMetrics.recordsWritten.toDouble,
+          summary.outputMetrics.recordsWritten)
+        assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched.toDouble,
           summary.shuffleReadMetrics.remoteBlocksFetched)
-        assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
+        assertQuantiles(_.shuffleReadMetrics.localBlocksFetched.toDouble,
           summary.shuffleReadMetrics.localBlocksFetched)
-        assertQuantiles(_.shuffleReadMetrics.fetchWaitTime,
+        assertQuantiles(_.shuffleReadMetrics.fetchWaitTime.toDouble,
           summary.shuffleReadMetrics.fetchWaitTime)
-        assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
+        assertQuantiles(_.shuffleReadMetrics.remoteBytesRead.toDouble,
           summary.shuffleReadMetrics.remoteBytesRead)
-        assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
+        assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk.toDouble,
           summary.shuffleReadMetrics.remoteBytesReadToDisk)
         assertQuantiles(
-          t => t.shuffleReadMetrics.localBytesRead + 
t.shuffleReadMetrics.remoteBytesRead,
+          t => t.shuffleReadMetrics.localBytesRead + 
t.shuffleReadMetrics.remoteBytesRead.toDouble,
           summary.shuffleReadMetrics.readBytes)
         assertQuantiles(
-          t => t.shuffleReadMetrics.localBlocksFetched + 
t.shuffleReadMetrics.remoteBlocksFetched,
+          t => t.shuffleReadMetrics.localBlocksFetched +
+            t.shuffleReadMetrics.remoteBlocksFetched.toDouble,
           summary.shuffleReadMetrics.totalBlocksFetched)
-        assertQuantiles(_.shuffleWriteMetrics.bytesWritten, 
summary.shuffleWriteMetrics.writeBytes)
-        assertQuantiles(_.shuffleWriteMetrics.writeTime, 
summary.shuffleWriteMetrics.writeTime)
-        assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
+        assertQuantiles(_.shuffleWriteMetrics.bytesWritten.toDouble,
+          summary.shuffleWriteMetrics.writeBytes)
+        assertQuantiles(_.shuffleWriteMetrics.writeTime.toDouble,
+          summary.shuffleWriteMetrics.writeTime)
+        assertQuantiles(_.shuffleWriteMetrics.recordsWritten.toDouble,
           summary.shuffleWriteMetrics.writeRecords)
       } finally {
         appStore.close()
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index d7099c5c953c..bc6fab45810e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -87,7 +87,7 @@ object SVDPlusPlus {
     val gJoinT0 = g.outerJoinVertices(t0) {
       (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
        msg: Option[(Long, Double)]) =>
-        (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / 
scala.math.sqrt(msg.get._1))
+        (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / 
scala.math.sqrt(msg.get._1.toDouble))
     }.cache()
     materialize(gJoinT0)
     g.unpersist()
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index caa2fdcdf5d2..666790958c35 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -321,7 +321,7 @@ class PageRankSuite extends SparkFunSuite with 
LocalSparkContext {
         val rank = if (vid < source) {
           0.0
         } else {
-          a * Math.pow(1 - resetProb, vid - source)
+          a * Math.pow(1 - resetProb, vid.toDouble - source)
         }
         vid -> rank
       }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 6e26a78e9c7e..aa39a3e177ee 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -1418,7 +1418,7 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
         case Row(label: Double, pred: Double, weight: Double) =>
           (label, pred, weight)
     }
-    family.aic(t, deviance, numInstances, weightSum) + 2 * rank
+    family.aic(t, deviance, numInstances.toDouble, weightSum) + 2 * rank
   }
 }
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
index d7b13f1bf25f..482bb7fdc210 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
@@ -224,7 +224,7 @@ private[ml] object ANOVATest {
     // mean square within
     val msw = sswn / dfwn
     val fValue = msb / msw
-    val pValue = 1 - new FDistribution(dfbn, 
dfwn).cumulativeProbability(fValue)
+    val pValue = 1 - new FDistribution(dfbn.toDouble, 
dfwn.toDouble).cumulativeProbability(fValue)
     val degreesOfFreedom = dfbn + dfwn
     (pValue, degreesOfFreedom, fValue)
   }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
index 89579dfcbb0c..e2ce6cf7214f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
@@ -135,7 +135,7 @@ private[ml] object FValueTest {
         } else Iterator.empty
       }.reduceByKey(_ + _
       ).mapPartitions { iter =>
-        val fd = new FDistribution(1, degreesOfFreedom)
+        val fd = new FDistribution(1.0, degreesOfFreedom.toDouble)
         iter.map { case (col, sumForCov) =>
           // Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
           val covariance = sumForCov / (numSamples - 1)
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index dbcf9017f174..234ecbc46063 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -525,7 +525,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with 
Logging {
     updateLambda(batchResult, batchSize)
 
     logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
-    logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+    logphatOption.foreach(updateAlpha(_, nonEmptyDocsN.toDouble))
 
     this
   }
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index ed6e3ea966b2..17b28ed3eba5 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -106,7 +106,7 @@ class StreamingKMeansModel @Since("1.2.0") (
         val numNewPoints = pointStats.iterator.map { case (_, (_, n)) =>
           n
         }.sum
-        math.pow(decayFactor, numNewPoints)
+        math.pow(decayFactor, numNewPoints.toDouble)
     }
 
     // apply discount to weights
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
index 06c775469195..79f482347289 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
@@ -91,8 +91,8 @@ class AssociationRules private[fpm] (
       .map { case (antecedent, ((consequent, freqUnion), freqAntecedent)) =>
         new Rule(antecedent.toArray,
           consequent.toArray,
-          freqUnion,
-          freqAntecedent,
+          freqUnion.toDouble,
+          freqAntecedent.toDouble,
           // the consequent contains always only one element
           itemSupport.get(consequent.head))
       }.filter(_.confidence >= minConfidence)
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 2bd4877ffc72..37bf9d45f664 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -633,7 +633,7 @@ class RowMatrix @Since("1.0.0") (
     val gamma = if (threshold < 1e-6) {
       Double.PositiveInfinity
     } else {
-      10 * math.log(numCols()) / threshold
+      10 * math.log(numCols().toDouble) / threshold
     }
 
     val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("normL2"))
@@ -823,7 +823,8 @@ class RowMatrix @Since("1.0.0") (
         + s"as it's bigger than maxResultSize ($maxDriverResultSizeInBytes 
Bytes)")
 
     val numerator = math.log(rows.getNumPartitions)
-    val denominator = math.log(maxDriverResultSizeInBytes) - 
math.log(aggregatedObjectSizeInBytes)
+    val denominator = math.log(maxDriverResultSizeInBytes.toDouble) -
+      math.log(aggregatedObjectSizeInBytes.toDouble)
     val desiredTreeDepth = math.ceil(numerator / denominator)
 
     if (desiredTreeDepth > 4) {
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index aa0bf51ebcd2..28c2b5d5027a 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -70,7 +70,7 @@ private[stat] object SpearmanCorrelation extends Correlation 
with Logging {
           val output = flush()
           preCol = j
           preVal = v
-          startRank = rank
+          startRank = rank.toDouble
           cachedUids += uid
           output
         } else {
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
index ead9f887fe81..d42df3e2f0dd 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
@@ -201,7 +201,7 @@ private[spark] object ChiSqTest extends Logging {
     counts.foreach { case ((label, value), c) =>
       val i = value2Index(value)
       val j = label2Index(label)
-      contingency.update(i, j, c)
+      contingency.update(i, j, c.toDouble)
     }
 
     ChiSqTest.chiSquaredMatrix(contingency, methodName)
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
index 8f3d0f8b3214..cf0fd388fa74 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
@@ -131,7 +131,7 @@ private[stat] object StudentTTest extends 
StreamingTestMethod with Logging {
       statsA: StatCounter,
       statsB: StatCounter): StreamingTestResult = {
     def studentDF(sample1: StatisticalSummaryValues, sample2: 
StatisticalSummaryValues): Double =
-      sample1.getN + sample2.getN - 2
+      sample1.getN + sample2.getN - 2.0
 
     new StreamingTestResult(
       tTester.get.homoscedasticTTest(statsA, statsB),
diff --git a/pom.xml b/pom.xml
index ac096a19804d..6ed16d88b0dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2978,7 +2978,7 @@
                 TODO(SPARK-33805): Undo the corresponding deprecated usage 
suppression rule after fixed.
               -->
               
<arg>-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since
 2.13).+$:e</arg>
-              <arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is 
deprecated because it loses precision).+$:s</arg>
+              <arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is 
deprecated because it loses precision).+$:e</arg>
               <!-- SPARK-45610 Convert "Auto-application to `()` is 
deprecated" to compile error, as it will become a compile error in Scala 3. -->
               <arg>-Wconf:cat=deprecation&amp;msg=Auto-application to \`\(\)\` 
is deprecated:e</arg>
               <!--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index e1db7b506c51..72ea06a8d050 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -236,7 +236,7 @@ object SparkBuild extends PomBuild {
         // TODO(SPARK-33805): Undo the corresponding deprecated usage 
suppression rule after
         //  fixed.
         
"-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since
 2.13).+$:e",
-        "-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated 
because it loses precision).+$:s",
+        "-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated 
because it loses precision).+$:e",
         // SPARK-45610 Convert "Auto-application to `()` is deprecated" to 
compile error, as it will become a compile error in Scala 3.
         "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is 
deprecated:e",
         // TODO(SPARK-45615): The issue described by 
https://github.com/scalatest/scalatest/issues/2297 can cause false positives.
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
index 9f5acf2e7e45..a2c4b9be0c56 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -150,14 +150,15 @@ class ExecutorRollDriverPlugin extends DriverPlugin with 
Logging {
    * Since we will choose only first item, the duplication is okay.
    */
   private def outliersFromMultipleDimensions(listWithoutDriver: 
Seq[v1.ExecutorSummary]) =
-    outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration 
/ e.totalTasks) ++
-      outliers(listWithoutDriver, e => e.totalDuration) ++
-      outliers(listWithoutDriver, e => e.totalGCTime) ++
-      outliers(listWithoutDriver, e => e.failedTasks) ++
-      outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
-      outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory")) 
++
-      outliers(listWithoutDriver, e => e.totalShuffleWrite) ++
-      outliers(listWithoutDriver, e => e.diskUsed)
+    outliers(listWithoutDriver.filter(_.totalTasks > 0),
+      e => (e.totalDuration / e.totalTasks).toFloat) ++
+      outliers(listWithoutDriver, e => e.totalDuration.toFloat) ++
+      outliers(listWithoutDriver, e => e.totalGCTime.toFloat) ++
+      outliers(listWithoutDriver, e => e.failedTasks.toFloat) ++
+      outliers(listWithoutDriver, e => getPeakMetrics(e, 
"JVMHeapMemory").toFloat) ++
+      outliers(listWithoutDriver, e => getPeakMetrics(e, 
"JVMOffHeapMemory").toFloat) ++
+      outliers(listWithoutDriver, e => e.totalShuffleWrite.toFloat) ++
+      outliers(listWithoutDriver, e => e.diskUsed.toFloat)
 
   /**
    * Return executors whose metrics is outstanding, '(value - mean) > 
2-sigma'. This is
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ff59e60482dc..9bd0f58e3df8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1127,7 +1127,8 @@ case class Range(
           val upperBinValue = getRangeValue(math.max(upperIndexPos, 0))
           val ndv = math.max(upperIndexPos - lowerIndexPos, 1)
           // Update the lowerIndex and lowerBinValue with upper ones for the 
next iteration.
-          (upperIndex, upperBinValue, binAr :+ HistogramBin(lowerBinValue, 
upperBinValue, ndv))
+          (upperIndex, upperBinValue,
+            binAr :+ HistogramBin(lowerBinValue.toDouble, 
upperBinValue.toDouble, ndv))
       }
     Histogram(height, binArray.toArray)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index d645929eea7d..7083014f1f38 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -337,7 +337,7 @@ object EstimationUtils {
               lo = right.lo,
               hi = right.hi,
               leftNdv = left.ndv * leftRatio,
-              rightNdv = right.ndv,
+              rightNdv = right.ndv.toDouble,
               leftNumRows = leftHeight * leftRatio,
               rightNumRows = rightHeight
             )
@@ -350,7 +350,7 @@ object EstimationUtils {
             OverlappedRange(
               lo = left.lo,
               hi = left.hi,
-              leftNdv = left.ndv,
+              leftNdv = left.ndv.toDouble,
               rightNdv = right.ndv * rightRatio,
               leftNumRows = leftHeight,
               rightNumRows = rightHeight * rightRatio
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 62b6ebde4e09..7b0e6ddd330d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -299,7 +299,7 @@ class QuantileSummaries(
           result(pos) = sampled.last.value
         } else {
           val (newIndex, newMinRank, approxQuantile) =
-            findApproxQuantile(index, minRank, targetError, percentile)
+            findApproxQuantile(index, minRank, targetError.toDouble, 
percentile)
           index = newIndex
           minRank = newMinRank
           result(pos) = approxQuantile
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index 26cdbcab79fe..b8edebd8bac5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -415,10 +415,10 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
         withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long, 
Double)])
     val inputRowsData = withNoProgress(query,
       query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
-        withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)])
+        withNumberInvalid { p.numInputRows.toDouble })), Array.empty[(Long, 
Double)])
     val batchDurations = withNoProgress(query,
       query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
-        withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)])
+        withNumberInvalid { p.batchDuration.toDouble })), Array.empty[(Long, 
Double)])
     val operationDurationData = withNoProgress(
       query,
       query.recentProgress.map { p =>
@@ -437,7 +437,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
         inputRateData.toImmutableArraySeq,
         minBatchTime,
         maxBatchTime,
-        minRecordRate,
+        minRecordRate.toDouble,
         maxRecordRate,
         "records/sec")
     graphUIDataForInputRate.generateDataJs(jsCollector)
@@ -449,7 +449,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
         processRateData.toImmutableArraySeq,
         minBatchTime,
         maxBatchTime,
-        minProcessRate,
+        minProcessRate.toDouble,
         maxProcessRate,
         "records/sec")
     graphUIDataForProcessRate.generateDataJs(jsCollector)
@@ -461,8 +461,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
         inputRowsData.toImmutableArraySeq,
         minBatchTime,
         maxBatchTime,
-        minRows,
-        maxRows,
+        minRows.toDouble,
+        maxRows.toDouble,
         "records")
     graphUIDataForInputRows.generateDataJs(jsCollector)
 
@@ -473,8 +473,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
         batchDurations.toImmutableArraySeq,
         minBatchTime,
         maxBatchTime,
-        minBatchDuration,
-        maxBatchDuration,
+        minBatchDuration.toDouble,
+        maxBatchDuration.toDouble,
         "ms")
     graphUIDataForBatchDuration.generateDataJs(jsCollector)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 04e47ac4a113..87eb35ee3e50 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -139,11 +139,11 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
       Some(Histogram(1, Array(HistogramBin(d1Internal, d1Internal, 1),
         HistogramBin(d1Internal, d2Internal, 1))))))
     colStats.update("ctimestamp", stats("ctimestamp").copy(histogram =
-      Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
-        HistogramBin(t1Internal, t2Internal, 1))))))
+      Some(Histogram(1, Array(HistogramBin(t1Internal.toDouble, 
t1Internal.toDouble, 1),
+        HistogramBin(t1Internal.toDouble, t2Internal.toDouble, 1))))))
     colStats.update("ctimestamp_ntz", stats("ctimestamp_ntz").copy(histogram =
-      Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
-        HistogramBin(t1Internal, t2Internal, 1))))))
+      Some(Histogram(1, Array(HistogramBin(t1Internal.toDouble, 
t1Internal.toDouble, 1),
+        HistogramBin(t1Internal.toDouble, t2Internal.toDouble, 1))))))
     colStats
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
index 9395c402fa90..39b76ede73d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
@@ -178,7 +178,7 @@ class PassThroughSuite extends SparkFunSuite {
         case SHORT => Seq(2: Short, 1: Short, 2: Short, nullValue.toShort: 
Short, 5: Short)
         case INT => Seq(2: Int, 1: Int, 2: Int, nullValue: Int, 5: Int)
         case LONG => Seq(2: Long, 1: Long, 2: Long, nullValue: Long, 5: Long)
-        case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue: Float, 5: 
Float)
+        case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue.toFloat: 
Float, 5: Float)
         case DOUBLE => Seq(2: Double, 1: Double, 2: Double, nullValue: Double, 
5: Double)
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 36ee3226087f..b0041b5ee989 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -74,14 +74,15 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
     assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
     val stats =
-      EventTimeStats(max = largeValue, min = largeValue, avg = largeValue, 
count = largeValue - 1)
+      EventTimeStats(
+        max = largeValue, min = largeValue, avg = largeValue.toDouble, count = 
largeValue - 1)
     stats.add(largeValue)
     stats.avg should be (largeValue.toDouble +- epsilon)
 
     val stats2 = EventTimeStats(
       max = largeValue + 1,
       min = largeValue,
-      avg = largeValue + 1,
+      avg = largeValue + 1.0,
       count = largeValue)
     stats.merge(stats2)
     stats.avg should be ((largeValue + 0.5) +- epsilon)
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index f77ca3e8fdb4..14e47b8f96fd 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -59,9 +59,9 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) 
extends Logging {
   private[receiver] def updateRate(newRate: Long): Unit =
     if (newRate > 0) {
       if (maxRateLimit > 0) {
-        rateLimiter.setRate(newRate.min(maxRateLimit))
+        rateLimiter.setRate(newRate.min(maxRateLimit).toDouble)
       } else {
-        rateLimiter.setRate(newRate)
+        rateLimiter.setRate(newRate.toDouble)
       }
     }
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 9b7014a6640d..bac82b5d331f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -206,8 +206,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
         recordRateForAllStreams.data,
         minBatchTime,
         maxBatchTime,
-        minRecordRate,
-        maxRecordRate,
+        minRecordRate.toDouble,
+        maxRecordRate.toDouble,
         "records/sec")
     graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector)
 
@@ -218,7 +218,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
         schedulingDelay.timelineData(normalizedUnit),
         minBatchTime,
         maxBatchTime,
-        minTime,
+        minTime.toDouble,
         maxTime,
         formattedUnit)
     graphUIDataForSchedulingDelay.generateDataJs(jsCollector)
@@ -230,7 +230,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
         processingTime.timelineData(normalizedUnit),
         minBatchTime,
         maxBatchTime,
-        minTime,
+        minTime.toDouble,
         maxTime,
         formattedUnit, Some(batchInterval))
     graphUIDataForProcessingTime.generateDataJs(jsCollector)
@@ -242,7 +242,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
         totalDelay.timelineData(normalizedUnit),
         minBatchTime,
         maxBatchTime,
-        minTime,
+        minTime.toDouble,
         maxTime,
         formattedUnit)
     graphUIDataForTotalDelay.generateDataJs(jsCollector)
@@ -294,7 +294,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
       {if (hasStream) {
         <tr id="inputs-table" style="display: none;" >
           <td colspan="3">
-            {generateInputDStreamsTable(jsCollector, minBatchTime, 
maxBatchTime, minRecordRate)}
+            {generateInputDStreamsTable(jsCollector, minBatchTime, 
maxBatchTime, minRecordRate.toDouble)}
           </td>
         </tr>
       }}
@@ -350,7 +350,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
     val content: Seq[Node] = 
listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap {
       case (streamId, recordRates) =>
         generateInputDStreamRow(
-          jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
+          jsCollector, streamId, recordRates, minX, maxX, minY, 
maxYCalculated.toDouble)
     }
 
     // scalastyle:off
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index 57ec162b0d17..3c5fc8a08e67 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -44,7 +44,7 @@ private[streaming] object UIUtils {
    */
   def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
     if (milliseconds < 1000) {
-      return (milliseconds, TimeUnit.MILLISECONDS)
+      return (milliseconds.toDouble, TimeUnit.MILLISECONDS)
     }
     val seconds = milliseconds.toDouble / 1000
     if (seconds < 60) {
@@ -67,9 +67,9 @@ private[streaming] object UIUtils {
    * will discard the fractional part.
    */
   def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit 
match {
-    case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
-    case TimeUnit.MICROSECONDS => milliseconds * 1000
-    case TimeUnit.MILLISECONDS => milliseconds
+    case TimeUnit.NANOSECONDS => milliseconds.toDouble * 1000 * 1000
+    case TimeUnit.MICROSECONDS => milliseconds.toDouble * 1000
+    case TimeUnit.MILLISECONDS => milliseconds.toDouble
     case TimeUnit.SECONDS => milliseconds / 1000.0
     case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
     case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 7d9dfb100f61..e0ca22ad77d1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -119,13 +119,13 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
       }
 
       // Batch proc time = batch interval, should increase allocation by 1
-      addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis.toDouble) {
         verifyTotalRequestedExecs(Some(3)) // one already allocated, increase 
allocation by 1
         verifyScaledDownExec(None)
       }
 
       // Batch proc time = batch interval * 2, should increase allocation by 2
-      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
+      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2.0) {
         verifyTotalRequestedExecs(Some(4))
         verifyScaledDownExec(None)
       }
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
index b5a45fc317d0..a4faed150157 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
@@ -83,5 +83,5 @@ private[streaming] class ConstantEstimator(@volatile private 
var rate: Long)
       time: Long,
       elements: Long,
       processingDelay: Long,
-      schedulingDelay: Long): Option[Double] = Some(rate)
+      schedulingDelay: Long): Option[Double] = Some(rate.toDouble)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to