This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e1a2255f99be
[SPARK-45699][BUILD][CORE][SQL][SS][CONNECT][MLLIB][ML][DSTREAM][GRAPHX][K8S][UI]
Fixing all compilation warnings related to widening conversions
e1a2255f99be is described below
commit e1a2255f99be88e776295f30f995b339c3e4b5af
Author: hannahkamundson <[email protected]>
AuthorDate: Mon Nov 27 10:38:22 2023 +0800
[SPARK-45699][BUILD][CORE][SQL][SS][CONNECT][MLLIB][ML][DSTREAM][GRAPHX][K8S][UI]
Fixing all compilation warnings related to widening conversions
### What changes were proposed in this pull request?
1. Change the silencing of the widening conversion compilation warnings in
the parent `pom.xml` and `SparkBuild` to throw an error
2. All widening conversion compilation warnings were removed. This almost
exclusively involved adding `.toDouble` to longs. However, it also involved
some `.toFloat` on ints and longs.
### Why are the changes needed?
It allows us to upgrade to Scala 2.13 without adding a bunch of compilation
issues. This is removing the following compilation error
```shell
[error]
/Users/yangjie01/SourceCode/git/spark-mine-sbt/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:1207:60:
Widening conversion from Long to Double is deprecated because it loses
precision. Write `.toDouble` instead. [quickfixable]
[error] Applicable -Wconf / nowarn filters for this fatal warning:
msg=<part of the message>, cat=deprecation,
site=org.apache.spark.scheduler.TaskSetManager.checkSpeculatableTasks
[error] foundTasks = checkAndSubmitSpeculatableTasks(timeMs,
threshold, customizedThreshold = true)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No tests were added.
For every profile (including the base profile), I ran `mvn clean compile
test-compile`. I then `grep`ed any lines that had the word `Wide` in it. I
determined I was done when no output remained.
Here is a script of what was run:
```shell
mvn clean compile test-compile |& tee output.txt
cat output.txt | grep .*Wide.* |& tee output-widening.txt
mvn clean compile test-compile -Pspark-ganglia-lgpl |& tee
output-spark-ganglia-lgpl.txt
cat output-spark-ganglia-lgpl.txt | grep .*Wide.* |& tee
output-spark-ganglia-lgpl-widening.txt
mvn clean compile test-compile -Pkinesis-asl |& tee output-kinesis-asl.txt
cat output-kinesis-asl.txt | grep .*Wide.* |& tee
output-kinesis-asl-widening.txt
mvn clean compile test-compile -Pdocker-integration-tests |& tee
output-docker-integration-tests.txt
cat output-docker-integration-tests.txt | grep .*Wide.* |& tee
output-docker-integration-tests-widening.txt
mvn clean compile test-compile -Pyarn \& tee output-yarn.txt
cat output-yarn.txt | grep .*Wide.* |& tee output-yarn-widening.txt
mvn clean compile test-compile -Pkubernetes |& tee output-kubernetes.txt
cat output-kubernetes.txt | grep .*Wide.* |& tee
output-kubernetes-widening.txt
mvn clean compile test-compile -Pkubernetes-integration-tests |& tee
output-kubernetes-integration-tests.txt
cat output-integration-tests.txt | grep .*Wide.* |& tee
output-integration-tests-widening.txt
mvn clean compile test-compile -Phive-thriftserver |& tee
output-hive-thriftserver.txt
cat output-thriftserver.txt | grep .*Wide.* |& tee
output-thriftserver-widening.txt
mvn clean compile test-compile -Phadoop-cloud |& tee output-hadoop-cloud.txt
cat output-hadoop-cloud.txt | grep .*Wide.* |& tee
output-hadoop-cloud-widening.txt
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43890 from hannahkamundson/SPARK-45699.
Authored-by: hannahkamundson <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../types/UTF8StringPropertyCheckSuite.scala | 4 +-
.../connect/client/arrow/ArrowVectorReader.scala | 6 +--
.../kafka010/DirectKafkaInputDStream.scala | 2 +-
.../apache/spark/streaming/kafka010/KafkaRDD.scala | 2 +-
.../kafka010/DirectKafkaStreamSuite.scala | 2 +-
.../spark/input/FixedLengthBinaryInputFormat.scala | 2 +-
.../apache/spark/metrics/sink/StatsdReporter.scala | 4 +-
.../org/apache/spark/partial/CountEvaluator.scala | 5 +-
.../spark/partial/GroupedCountEvaluator.scala | 4 +-
.../org/apache/spark/resource/ResourceUtils.scala | 2 +-
.../org/apache/spark/scheduler/MapStatus.scala | 8 ++--
.../apache/spark/scheduler/TaskSetManager.scala | 7 +--
.../main/scala/org/apache/spark/util/Clock.scala | 2 +-
.../util/random/StratifiedSamplingUtils.scala | 4 +-
.../org/apache/spark/benchmark/Benchmark.scala | 2 +-
.../spark/deploy/history/EventLogTestHelper.scala | 2 +-
.../apache/spark/status/AppStatusStoreSuite.scala | 54 ++++++++++++----------
.../org/apache/spark/graphx/lib/SVDPlusPlus.scala | 2 +-
.../apache/spark/graphx/lib/PageRankSuite.scala | 2 +-
.../regression/GeneralizedLinearRegression.scala | 2 +-
.../scala/org/apache/spark/ml/stat/ANOVATest.scala | 2 +-
.../org/apache/spark/ml/stat/FValueTest.scala | 2 +-
.../spark/mllib/clustering/LDAOptimizer.scala | 2 +-
.../spark/mllib/clustering/StreamingKMeans.scala | 2 +-
.../apache/spark/mllib/fpm/AssociationRules.scala | 4 +-
.../spark/mllib/linalg/distributed/RowMatrix.scala | 5 +-
.../stat/correlation/SpearmanCorrelation.scala | 2 +-
.../apache/spark/mllib/stat/test/ChiSqTest.scala | 2 +-
.../mllib/stat/test/StreamingTestMethod.scala | 2 +-
pom.xml | 2 +-
project/SparkBuild.scala | 2 +-
.../scheduler/cluster/k8s/ExecutorRollPlugin.scala | 17 +++----
.../plans/logical/basicLogicalOperators.scala | 3 +-
.../logical/statsEstimation/EstimationUtils.scala | 4 +-
.../sql/catalyst/util/QuantileSummaries.scala | 2 +-
.../ui/StreamingQueryStatisticsPage.scala | 16 +++----
.../spark/sql/StatisticsCollectionTestBase.scala | 8 ++--
.../compression/PassThroughEncodingSuite.scala | 2 +-
.../sql/streaming/EventTimeWatermarkSuite.scala | 5 +-
.../spark/streaming/receiver/RateLimiter.scala | 4 +-
.../apache/spark/streaming/ui/StreamingPage.scala | 14 +++---
.../org/apache/spark/streaming/ui/UIUtils.scala | 8 ++--
.../scheduler/ExecutorAllocationManagerSuite.scala | 4 +-
.../streaming/scheduler/RateControllerSuite.scala | 2 +-
44 files changed, 124 insertions(+), 110 deletions(-)
diff --git
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
index ab488e18ba3f..75c56451592e 100644
---
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
+++
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
@@ -80,7 +80,9 @@ class UTF8StringPropertyCheckSuite extends AnyFunSuite with
ScalaCheckDrivenProp
test("compare") {
forAll { (s1: String, s2: String) =>
- assert(Math.signum(toUTF8(s1).compareTo(toUTF8(s2))) ===
Math.signum(s1.compareTo(s2)))
+ assert(Math.signum {
+ toUTF8(s1).compareTo(toUTF8(s2)).toFloat
+ } === Math.signum(s1.compareTo(s2).toFloat))
}
}
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
index 488208574809..53d8d46e6268 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala
@@ -134,7 +134,7 @@ private[arrow] class SmallIntVectorReader(v: SmallIntVector)
private[arrow] class IntVectorReader(v: IntVector) extends
TypedArrowVectorReader[IntVector](v) {
override def getInt(i: Int): Int = vector.get(i)
override def getLong(i: Int): Long = getInt(i)
- override def getFloat(i: Int): Float = getInt(i)
+ override def getFloat(i: Int): Float = getInt(i).toFloat
override def getDouble(i: Int): Double = getInt(i)
override def getString(i: Int): String = String.valueOf(getInt(i))
override def getJavaDecimal(i: Int): JBigDecimal =
JBigDecimal.valueOf(getInt(i))
@@ -143,8 +143,8 @@ private[arrow] class IntVectorReader(v: IntVector) extends
TypedArrowVectorReade
private[arrow] class BigIntVectorReader(v: BigIntVector)
extends TypedArrowVectorReader[BigIntVector](v) {
override def getLong(i: Int): Long = vector.get(i)
- override def getFloat(i: Int): Float = getLong(i)
- override def getDouble(i: Int): Double = getLong(i)
+ override def getFloat(i: Int): Float = getLong(i).toFloat
+ override def getDouble(i: Int): Double = getLong(i).toDouble
override def getString(i: Int): String = String.valueOf(getLong(i))
override def getJavaDecimal(i: Int): JBigDecimal =
JBigDecimal.valueOf(getLong(i))
override def getTimestamp(i: Int): Timestamp = toJavaTimestamp(getLong(i) *
MICROS_PER_SECOND)
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index f5967a74ad33..c412486ce197 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -146,7 +146,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
val backpressureRate = lag / totalLag.toDouble * rate
tp -> (if (maxRateLimitPerPartition > 0) {
- Math.min(backpressureRate, maxRateLimitPerPartition)} else
backpressureRate)
+ Math.min(backpressureRate, maxRateLimitPerPartition.toDouble)}
else backpressureRate)
}
case None => offsets.map { case (tp, offset) => tp ->
ppc.maxRatePerPartition(tp).toDouble }
}
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 286b073125ff..6c57091bc3c4 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -98,7 +98,7 @@ private[spark] class KafkaRDD[K, V](
if (compacted) {
super.countApprox(timeout, confidence)
} else {
- val c = count()
+ val c = count().toDouble
new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
}
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index faf114108fac..28f090625830 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -805,7 +805,7 @@ private[streaming] class ConstantEstimator(@volatile
private var rate: Long)
time: Long,
elements: Long,
processingDelay: Long,
- schedulingDelay: Long): Option[Double] = Some(rate)
+ schedulingDelay: Long): Option[Double] = Some(rate.toDouble)
}
private[streaming] class ConstantRateController(id: Int, estimator:
RateEstimator, rate: Long)
diff --git
a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 978afaffab30..4897cf694ae8 100644
---
a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -74,7 +74,7 @@ private[spark] class FixedLengthBinaryInputFormat
if (defaultSize < recordLength) {
recordLength.toLong
} else {
- (Math.floor(defaultSize / recordLength) * recordLength).toLong
+ defaultSize / recordLength * recordLength
}
}
diff --git
a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
index 877f04b1adc0..189d390d3799 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
@@ -124,9 +124,9 @@ private[spark] class StatsdReporter(
private def reportTimer(name: String, timer: Timer)(implicit socket:
DatagramSocket): Unit = {
val snapshot = timer.getSnapshot
- send(fullName(name, "max"), format(convertDuration(snapshot.getMax)),
TIMER)
+ send(fullName(name, "max"),
format(convertDuration(snapshot.getMax.toDouble)), TIMER)
send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)),
TIMER)
- send(fullName(name, "min"), format(convertDuration(snapshot.getMin)),
TIMER)
+ send(fullName(name, "min"),
format(convertDuration(snapshot.getMin.toDouble)), TIMER)
send(fullName(name, "stddev"),
format(convertDuration(snapshot.getStdDev)), TIMER)
send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)),
TIMER)
send(fullName(name, "p75"),
format(convertDuration(snapshot.get75thPercentile)), TIMER)
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
index cbee13687101..a974ca2f1a05 100644
--- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
@@ -35,7 +35,7 @@ private[spark] class CountEvaluator(totalOutputs: Int,
confidence: Double)
override def currentResult(): BoundedDouble = {
if (outputsMerged == totalOutputs) {
- new BoundedDouble(sum, 1.0, sum, sum)
+ new BoundedDouble(sum.toDouble, 1.0, sum.toDouble, sum.toDouble)
} else if (outputsMerged == 0 || sum == 0) {
new BoundedDouble(0, 0.0, 0.0, Double.PositiveInfinity)
} else {
@@ -57,7 +57,8 @@ private[partial] object CountEvaluator {
val low = dist.inverseCumulativeProbability((1 - confidence) / 2)
val high = dist.inverseCumulativeProbability((1 + confidence) / 2)
// Add 'sum' to each because distribution is just of remaining count, not
observed
- new BoundedDouble(sum + dist.getNumericalMean, confidence, sum + low, sum
+ high)
+ new BoundedDouble(
+ sum + dist.getNumericalMean, confidence, (sum + low).toDouble, (sum +
high).toDouble)
}
diff --git
a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index d2b4187df5d5..7cd60815fadb 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -41,7 +41,9 @@ private[spark] class GroupedCountEvaluator[T :
ClassTag](totalOutputs: Int, conf
override def currentResult(): Map[T, BoundedDouble] = {
if (outputsMerged == totalOutputs) {
- sums.map { case (key, sum) => (key, new BoundedDouble(sum, 1.0, sum,
sum)) }.toMap
+ sums.map { case (key, sum) =>
+ (key, new BoundedDouble(sum.toDouble, 1.0, sum.toDouble, sum.toDouble))
+ }.toMap
} else if (outputsMerged == 0) {
new HashMap[T, BoundedDouble]
} else {
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 00c655f4a4f4..fe08e8337f76 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -476,7 +476,7 @@ private[spark] object ResourceUtils extends Logging {
if (maxTaskPerExec < (execAmount * numParts / taskAmount)) {
val origTaskAmount = treq.amount
val taskReqStr = s"${origTaskAmount}/${numParts}"
- val resourceNumSlots = Math.floor(execAmount * numParts /
taskAmount).toInt
+ val resourceNumSlots = (execAmount * numParts / taskAmount).toInt
val message = s"The configuration of resource: ${treq.resourceName} " +
s"(exec = ${execAmount}, task = ${taskReqStr}, " +
s"runnable tasks = ${resourceNumSlots}) will " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d10cf55ed0d1..113521453ad7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -95,7 +95,7 @@ private[spark] object MapStatus {
} else if (size <= 1L) {
1
} else {
- math.min(255, math.ceil(math.log(size) /
math.log(LOG_BASE)).toInt).toByte
+ math.min(255, math.ceil(math.log(size.toDouble) /
math.log(LOG_BASE)).toInt).toByte
}
}
@@ -276,12 +276,12 @@ private[spark] object HighlyCompressedMapStatus {
val skewSizeThreshold =
Math.max(
medianSize * accurateBlockSkewedFactor,
- sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber)
+ sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber).toDouble
)
- Math.min(shuffleAccurateBlockThreshold, skewSizeThreshold)
+ Math.min(shuffleAccurateBlockThreshold.toDouble, skewSizeThreshold)
} else {
// Disable skew detection if accurateBlockSkewedFactor <= 0
- shuffleAccurateBlockThreshold
+ shuffleAccurateBlockThreshold.toDouble
}
val hugeBlockSizes = mutable.Map.empty[Int, Byte]
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6157a3e46c87..d17e6735c4ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -809,7 +809,7 @@ private[spark] class TaskSetManager(
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
- successfulTaskDurations.insert(info.duration)
+ successfulTaskDurations.insert(info.duration.toDouble)
taskProcessRateCalculator.foreach(_.updateAvgTaskProcessRate(tid,
result))
}
removeRunningTask(tid)
@@ -1196,7 +1196,7 @@ private[spark] class TaskSetManager(
val timeMs = clock.getTimeMillis()
if (numSuccessfulTasks >= minFinishedForSpeculation) {
val medianDuration = successfulTaskDurations.percentile()
- val threshold = max(speculationMultiplier * medianDuration,
minTimeToSpeculation)
+ val threshold = max(speculationMultiplier * medianDuration,
minTimeToSpeculation.toDouble)
// TODO: Threshold should also look at standard deviation of task
durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
@@ -1204,7 +1204,8 @@ private[spark] class TaskSetManager(
} else if (isSpeculationThresholdSpecified &&
speculationTasksLessEqToSlots) {
val threshold = speculationTaskDurationThresOpt.get
logDebug(s"Tasks taking longer time than provided speculation threshold:
$threshold")
- foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold,
customizedThreshold = true)
+ foundTasks = checkAndSubmitSpeculatableTasks(
+ timeMs, threshold.toDouble, customizedThreshold = true)
}
// avoid more warning logs.
if (foundTasks) {
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala
b/core/src/main/scala/org/apache/spark/util/Clock.scala
index 226f15d3d38c..e0cb3f4188e6 100644
--- a/core/src/main/scala/org/apache/spark/util/Clock.scala
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -85,7 +85,7 @@ private[spark] class SystemClock extends Clock {
return currentTime
}
- val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
+ val pollTime = math.max(waitTime / 10.0, minPollTime.toDouble).toLong
while (true) {
currentTime = System.currentTimeMillis()
diff --git
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
index f08cf44e4e12..08e2ea01f623 100644
---
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
+++
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
@@ -98,8 +98,8 @@ private[spark] object StratifiedSamplingUtils extends Logging
{
if (acceptResult.areBoundsEmpty) {
val n = counts.get(key)
val sampleSize = math.ceil(n * fraction).toLong
- val lmbd1 = PoissonBounds.getLowerBound(sampleSize)
- val lmbd2 = PoissonBounds.getUpperBound(sampleSize)
+ val lmbd1 = PoissonBounds.getLowerBound(sampleSize.toDouble)
+ val lmbd2 = PoissonBounds.getUpperBound(sampleSize.toDouble)
acceptResult.acceptBound = lmbd1 / n
acceptResult.waitListBound = (lmbd2 - lmbd1) / n
}
diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
index 0b33e2a9426c..e7315d6119be 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
@@ -163,7 +163,7 @@ private[spark] class Benchmark(
// scalastyle:on
assert(runTimes.nonEmpty)
val best = runTimes.min
- val avg = runTimes.sum / runTimes.size
+ val avg = runTimes.sum.toDouble / runTimes.size
val stdev = if (runTimes.size > 1) {
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum /
(runTimes.size - 1))
} else 0
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
index ac89f60955ee..0161917f8853 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
@@ -56,7 +56,7 @@ object EventLogTestHelper {
eventStr: String,
desiredSize: Long): Seq[String] = {
val stringLen = eventStr.getBytes(StandardCharsets.UTF_8).length
- val repeatCount = Math.floor(desiredSize / stringLen).toInt
+ val repeatCount = (desiredSize / stringLen).toInt
(0 until repeatCount).map { _ =>
writer.writeEvent(eventStr, flushLogger = true)
eventStr
diff --git
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index ccf6c9184cc9..f2b795764b7e 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -170,40 +170,44 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(actualQuantiles === expectedQuantiles)
}
- assertQuantiles(_.executorDeserializeTime,
summary.executorDeserializeTime)
- assertQuantiles(_.executorDeserializeCpuTime,
summary.executorDeserializeCpuTime)
- assertQuantiles(_.executorRunTime, summary.executorRunTime)
- assertQuantiles(_.executorRunTime, summary.executorRunTime)
- assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
- assertQuantiles(_.resultSize, summary.resultSize)
- assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
- assertQuantiles(_.resultSerializationTime,
summary.resultSerializationTime)
- assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
- assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
- assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
- assertQuantiles(_.inputMetrics.bytesRead,
summary.inputMetrics.bytesRead)
- assertQuantiles(_.inputMetrics.recordsRead,
summary.inputMetrics.recordsRead)
- assertQuantiles(_.outputMetrics.bytesWritten,
summary.outputMetrics.bytesWritten)
- assertQuantiles(_.outputMetrics.recordsWritten,
summary.outputMetrics.recordsWritten)
- assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
+ assertQuantiles(_.executorDeserializeTime.toDouble,
summary.executorDeserializeTime)
+ assertQuantiles(_.executorDeserializeCpuTime.toDouble,
summary.executorDeserializeCpuTime)
+ assertQuantiles(_.executorRunTime.toDouble, summary.executorRunTime)
+ assertQuantiles(_.executorRunTime.toDouble, summary.executorRunTime)
+ assertQuantiles(_.executorCpuTime.toDouble, summary.executorCpuTime)
+ assertQuantiles(_.resultSize.toDouble, summary.resultSize)
+ assertQuantiles(_.jvmGCTime.toDouble, summary.jvmGcTime)
+ assertQuantiles(_.resultSerializationTime.toDouble,
summary.resultSerializationTime)
+ assertQuantiles(_.memoryBytesSpilled.toDouble,
summary.memoryBytesSpilled)
+ assertQuantiles(_.diskBytesSpilled.toDouble, summary.diskBytesSpilled)
+ assertQuantiles(_.peakExecutionMemory.toDouble,
summary.peakExecutionMemory)
+ assertQuantiles(_.inputMetrics.bytesRead.toDouble,
summary.inputMetrics.bytesRead)
+ assertQuantiles(_.inputMetrics.recordsRead.toDouble,
summary.inputMetrics.recordsRead)
+ assertQuantiles(_.outputMetrics.bytesWritten.toDouble,
summary.outputMetrics.bytesWritten)
+ assertQuantiles(_.outputMetrics.recordsWritten.toDouble,
+ summary.outputMetrics.recordsWritten)
+ assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched.toDouble,
summary.shuffleReadMetrics.remoteBlocksFetched)
- assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
+ assertQuantiles(_.shuffleReadMetrics.localBlocksFetched.toDouble,
summary.shuffleReadMetrics.localBlocksFetched)
- assertQuantiles(_.shuffleReadMetrics.fetchWaitTime,
+ assertQuantiles(_.shuffleReadMetrics.fetchWaitTime.toDouble,
summary.shuffleReadMetrics.fetchWaitTime)
- assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
+ assertQuantiles(_.shuffleReadMetrics.remoteBytesRead.toDouble,
summary.shuffleReadMetrics.remoteBytesRead)
- assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
+ assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk.toDouble,
summary.shuffleReadMetrics.remoteBytesReadToDisk)
assertQuantiles(
- t => t.shuffleReadMetrics.localBytesRead +
t.shuffleReadMetrics.remoteBytesRead,
+ t => t.shuffleReadMetrics.localBytesRead +
t.shuffleReadMetrics.remoteBytesRead.toDouble,
summary.shuffleReadMetrics.readBytes)
assertQuantiles(
- t => t.shuffleReadMetrics.localBlocksFetched +
t.shuffleReadMetrics.remoteBlocksFetched,
+ t => t.shuffleReadMetrics.localBlocksFetched +
+ t.shuffleReadMetrics.remoteBlocksFetched.toDouble,
summary.shuffleReadMetrics.totalBlocksFetched)
- assertQuantiles(_.shuffleWriteMetrics.bytesWritten,
summary.shuffleWriteMetrics.writeBytes)
- assertQuantiles(_.shuffleWriteMetrics.writeTime,
summary.shuffleWriteMetrics.writeTime)
- assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
+ assertQuantiles(_.shuffleWriteMetrics.bytesWritten.toDouble,
+ summary.shuffleWriteMetrics.writeBytes)
+ assertQuantiles(_.shuffleWriteMetrics.writeTime.toDouble,
+ summary.shuffleWriteMetrics.writeTime)
+ assertQuantiles(_.shuffleWriteMetrics.recordsWritten.toDouble,
summary.shuffleWriteMetrics.writeRecords)
} finally {
appStore.close()
diff --git
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index d7099c5c953c..bc6fab45810e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -87,7 +87,7 @@ object SVDPlusPlus {
val gJoinT0 = g.outerJoinVertices(t0) {
(vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
msg: Option[(Long, Double)]) =>
- (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 /
scala.math.sqrt(msg.get._1))
+ (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 /
scala.math.sqrt(msg.get._1.toDouble))
}.cache()
materialize(gJoinT0)
g.unpersist()
diff --git
a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index caa2fdcdf5d2..666790958c35 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -321,7 +321,7 @@ class PageRankSuite extends SparkFunSuite with
LocalSparkContext {
val rank = if (vid < source) {
0.0
} else {
- a * Math.pow(1 - resetProb, vid - source)
+ a * Math.pow(1 - resetProb, vid.toDouble - source)
}
vid -> rank
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 6e26a78e9c7e..aa39a3e177ee 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -1418,7 +1418,7 @@ class GeneralizedLinearRegressionSummary
private[regression] (
case Row(label: Double, pred: Double, weight: Double) =>
(label, pred, weight)
}
- family.aic(t, deviance, numInstances, weightSum) + 2 * rank
+ family.aic(t, deviance, numInstances.toDouble, weightSum) + 2 * rank
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
b/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
index d7b13f1bf25f..482bb7fdc210 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala
@@ -224,7 +224,7 @@ private[ml] object ANOVATest {
// mean square within
val msw = sswn / dfwn
val fValue = msb / msw
- val pValue = 1 - new FDistribution(dfbn,
dfwn).cumulativeProbability(fValue)
+ val pValue = 1 - new FDistribution(dfbn.toDouble,
dfwn.toDouble).cumulativeProbability(fValue)
val degreesOfFreedom = dfbn + dfwn
(pValue, degreesOfFreedom, fValue)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
index 89579dfcbb0c..e2ce6cf7214f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala
@@ -135,7 +135,7 @@ private[ml] object FValueTest {
} else Iterator.empty
}.reduceByKey(_ + _
).mapPartitions { iter =>
- val fd = new FDistribution(1, degreesOfFreedom)
+ val fd = new FDistribution(1.0, degreesOfFreedom.toDouble)
iter.map { case (col, sumForCov) =>
// Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1)
val covariance = sumForCov / (numSamples - 1)
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index dbcf9017f174..234ecbc46063 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -525,7 +525,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with
Logging {
updateLambda(batchResult, batchSize)
logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
- logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+ logphatOption.foreach(updateAlpha(_, nonEmptyDocsN.toDouble))
this
}
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index ed6e3ea966b2..17b28ed3eba5 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -106,7 +106,7 @@ class StreamingKMeansModel @Since("1.2.0") (
val numNewPoints = pointStats.iterator.map { case (_, (_, n)) =>
n
}.sum
- math.pow(decayFactor, numNewPoints)
+ math.pow(decayFactor, numNewPoints.toDouble)
}
// apply discount to weights
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
index 06c775469195..79f482347289 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
@@ -91,8 +91,8 @@ class AssociationRules private[fpm] (
.map { case (antecedent, ((consequent, freqUnion), freqAntecedent)) =>
new Rule(antecedent.toArray,
consequent.toArray,
- freqUnion,
- freqAntecedent,
+ freqUnion.toDouble,
+ freqAntecedent.toDouble,
// the consequent contains always only one element
itemSupport.get(consequent.head))
}.filter(_.confidence >= minConfidence)
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 2bd4877ffc72..37bf9d45f664 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -633,7 +633,7 @@ class RowMatrix @Since("1.0.0") (
val gamma = if (threshold < 1e-6) {
Double.PositiveInfinity
} else {
- 10 * math.log(numCols()) / threshold
+ 10 * math.log(numCols().toDouble) / threshold
}
val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("normL2"))
@@ -823,7 +823,8 @@ class RowMatrix @Since("1.0.0") (
+ s"as it's bigger than maxResultSize ($maxDriverResultSizeInBytes
Bytes)")
val numerator = math.log(rows.getNumPartitions)
- val denominator = math.log(maxDriverResultSizeInBytes) -
math.log(aggregatedObjectSizeInBytes)
+ val denominator = math.log(maxDriverResultSizeInBytes.toDouble) -
+ math.log(aggregatedObjectSizeInBytes.toDouble)
val desiredTreeDepth = math.ceil(numerator / denominator)
if (desiredTreeDepth > 4) {
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index aa0bf51ebcd2..28c2b5d5027a 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -70,7 +70,7 @@ private[stat] object SpearmanCorrelation extends Correlation
with Logging {
val output = flush()
preCol = j
preVal = v
- startRank = rank
+ startRank = rank.toDouble
cachedUids += uid
output
} else {
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
index ead9f887fe81..d42df3e2f0dd 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
@@ -201,7 +201,7 @@ private[spark] object ChiSqTest extends Logging {
counts.foreach { case ((label, value), c) =>
val i = value2Index(value)
val j = label2Index(label)
- contingency.update(i, j, c)
+ contingency.update(i, j, c.toDouble)
}
ChiSqTest.chiSquaredMatrix(contingency, methodName)
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
index 8f3d0f8b3214..cf0fd388fa74 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala
@@ -131,7 +131,7 @@ private[stat] object StudentTTest extends
StreamingTestMethod with Logging {
statsA: StatCounter,
statsB: StatCounter): StreamingTestResult = {
def studentDF(sample1: StatisticalSummaryValues, sample2:
StatisticalSummaryValues): Double =
- sample1.getN + sample2.getN - 2
+ sample1.getN + sample2.getN - 2.0
new StreamingTestResult(
tTester.get.homoscedasticTTest(statsA, statsB),
diff --git a/pom.xml b/pom.xml
index ac096a19804d..6ed16d88b0dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2978,7 +2978,7 @@
TODO(SPARK-33805): Undo the corresponding deprecated usage
suppression rule after fixed.
-->
<arg>-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since
2.13).+$:e</arg>
- <arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is
deprecated because it loses precision).+$:s</arg>
+ <arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is
deprecated because it loses precision).+$:e</arg>
<!-- SPARK-45610 Convert "Auto-application to `()` is
deprecated" to compile error, as it will become a compile error in Scala 3. -->
<arg>-Wconf:cat=deprecation&msg=Auto-application to \`\(\)\`
is deprecated:e</arg>
<!--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index e1db7b506c51..72ea06a8d050 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -236,7 +236,7 @@ object SparkBuild extends PomBuild {
// TODO(SPARK-33805): Undo the corresponding deprecated usage
suppression rule after
// fixed.
"-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since
2.13).+$:e",
- "-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated
because it loses precision).+$:s",
+ "-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated
because it loses precision).+$:e",
// SPARK-45610 Convert "Auto-application to `()` is deprecated" to
compile error, as it will become a compile error in Scala 3.
"-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is
deprecated:e",
// TODO(SPARK-45615): The issue described by
https://github.com/scalatest/scalatest/issues/2297 can cause false positives.
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
index 9f5acf2e7e45..a2c4b9be0c56 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -150,14 +150,15 @@ class ExecutorRollDriverPlugin extends DriverPlugin with
Logging {
* Since we will choose only first item, the duplication is okay.
*/
private def outliersFromMultipleDimensions(listWithoutDriver:
Seq[v1.ExecutorSummary]) =
- outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration
/ e.totalTasks) ++
- outliers(listWithoutDriver, e => e.totalDuration) ++
- outliers(listWithoutDriver, e => e.totalGCTime) ++
- outliers(listWithoutDriver, e => e.failedTasks) ++
- outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
- outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory"))
++
- outliers(listWithoutDriver, e => e.totalShuffleWrite) ++
- outliers(listWithoutDriver, e => e.diskUsed)
+ outliers(listWithoutDriver.filter(_.totalTasks > 0),
+ e => (e.totalDuration / e.totalTasks).toFloat) ++
+ outliers(listWithoutDriver, e => e.totalDuration.toFloat) ++
+ outliers(listWithoutDriver, e => e.totalGCTime.toFloat) ++
+ outliers(listWithoutDriver, e => e.failedTasks.toFloat) ++
+ outliers(listWithoutDriver, e => getPeakMetrics(e,
"JVMHeapMemory").toFloat) ++
+ outliers(listWithoutDriver, e => getPeakMetrics(e,
"JVMOffHeapMemory").toFloat) ++
+ outliers(listWithoutDriver, e => e.totalShuffleWrite.toFloat) ++
+ outliers(listWithoutDriver, e => e.diskUsed.toFloat)
/**
* Return executors whose metrics is outstanding, '(value - mean) >
2-sigma'. This is
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ff59e60482dc..9bd0f58e3df8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1127,7 +1127,8 @@ case class Range(
val upperBinValue = getRangeValue(math.max(upperIndexPos, 0))
val ndv = math.max(upperIndexPos - lowerIndexPos, 1)
// Update the lowerIndex and lowerBinValue with upper ones for the
next iteration.
- (upperIndex, upperBinValue, binAr :+ HistogramBin(lowerBinValue,
upperBinValue, ndv))
+ (upperIndex, upperBinValue,
+ binAr :+ HistogramBin(lowerBinValue.toDouble,
upperBinValue.toDouble, ndv))
}
Histogram(height, binArray.toArray)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index d645929eea7d..7083014f1f38 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -337,7 +337,7 @@ object EstimationUtils {
lo = right.lo,
hi = right.hi,
leftNdv = left.ndv * leftRatio,
- rightNdv = right.ndv,
+ rightNdv = right.ndv.toDouble,
leftNumRows = leftHeight * leftRatio,
rightNumRows = rightHeight
)
@@ -350,7 +350,7 @@ object EstimationUtils {
OverlappedRange(
lo = left.lo,
hi = left.hi,
- leftNdv = left.ndv,
+ leftNdv = left.ndv.toDouble,
rightNdv = right.ndv * rightRatio,
leftNumRows = leftHeight,
rightNumRows = rightHeight * rightRatio
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 62b6ebde4e09..7b0e6ddd330d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -299,7 +299,7 @@ class QuantileSummaries(
result(pos) = sampled.last.value
} else {
val (newIndex, newMinRank, approxQuantile) =
- findApproxQuantile(index, minRank, targetError, percentile)
+ findApproxQuantile(index, minRank, targetError.toDouble,
percentile)
index = newIndex
minRank = newMinRank
result(pos) = approxQuantile
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index 26cdbcab79fe..b8edebd8bac5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -415,10 +415,10 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
withNumberInvalid { p.processedRowsPerSecond })), Array.empty[(Long,
Double)])
val inputRowsData = withNoProgress(query,
query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
- withNumberInvalid { p.numInputRows })), Array.empty[(Long, Double)])
+ withNumberInvalid { p.numInputRows.toDouble })), Array.empty[(Long,
Double)])
val batchDurations = withNoProgress(query,
query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
- withNumberInvalid { p.batchDuration })), Array.empty[(Long, Double)])
+ withNumberInvalid { p.batchDuration.toDouble })), Array.empty[(Long,
Double)])
val operationDurationData = withNoProgress(
query,
query.recentProgress.map { p =>
@@ -437,7 +437,7 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
inputRateData.toImmutableArraySeq,
minBatchTime,
maxBatchTime,
- minRecordRate,
+ minRecordRate.toDouble,
maxRecordRate,
"records/sec")
graphUIDataForInputRate.generateDataJs(jsCollector)
@@ -449,7 +449,7 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
processRateData.toImmutableArraySeq,
minBatchTime,
maxBatchTime,
- minProcessRate,
+ minProcessRate.toDouble,
maxProcessRate,
"records/sec")
graphUIDataForProcessRate.generateDataJs(jsCollector)
@@ -461,8 +461,8 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
inputRowsData.toImmutableArraySeq,
minBatchTime,
maxBatchTime,
- minRows,
- maxRows,
+ minRows.toDouble,
+ maxRows.toDouble,
"records")
graphUIDataForInputRows.generateDataJs(jsCollector)
@@ -473,8 +473,8 @@ private[ui] class StreamingQueryStatisticsPage(parent:
StreamingQueryTab)
batchDurations.toImmutableArraySeq,
minBatchTime,
maxBatchTime,
- minBatchDuration,
- maxBatchDuration,
+ minBatchDuration.toDouble,
+ maxBatchDuration.toDouble,
"ms")
graphUIDataForBatchDuration.generateDataJs(jsCollector)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 04e47ac4a113..87eb35ee3e50 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -139,11 +139,11 @@ abstract class StatisticsCollectionTestBase extends
QueryTest with SQLTestUtils
Some(Histogram(1, Array(HistogramBin(d1Internal, d1Internal, 1),
HistogramBin(d1Internal, d2Internal, 1))))))
colStats.update("ctimestamp", stats("ctimestamp").copy(histogram =
- Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
- HistogramBin(t1Internal, t2Internal, 1))))))
+ Some(Histogram(1, Array(HistogramBin(t1Internal.toDouble,
t1Internal.toDouble, 1),
+ HistogramBin(t1Internal.toDouble, t2Internal.toDouble, 1))))))
colStats.update("ctimestamp_ntz", stats("ctimestamp_ntz").copy(histogram =
- Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
- HistogramBin(t1Internal, t2Internal, 1))))))
+ Some(Histogram(1, Array(HistogramBin(t1Internal.toDouble,
t1Internal.toDouble, 1),
+ HistogramBin(t1Internal.toDouble, t2Internal.toDouble, 1))))))
colStats
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
index 9395c402fa90..39b76ede73d2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
@@ -178,7 +178,7 @@ class PassThroughSuite extends SparkFunSuite {
case SHORT => Seq(2: Short, 1: Short, 2: Short, nullValue.toShort:
Short, 5: Short)
case INT => Seq(2: Int, 1: Int, 2: Int, nullValue: Int, 5: Int)
case LONG => Seq(2: Long, 1: Long, 2: Long, nullValue: Long, 5: Long)
- case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue: Float, 5:
Float)
+ case FLOAT => Seq(2: Float, 1: Float, 2: Float, nullValue.toFloat:
Float, 5: Float)
case DOUBLE => Seq(2: Double, 1: Double, 2: Double, nullValue: Double,
5: Double)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 36ee3226087f..b0041b5ee989 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -74,14 +74,15 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
// Make sure `largeValue` will cause overflow if we use a Long sum to calc
avg.
assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
val stats =
- EventTimeStats(max = largeValue, min = largeValue, avg = largeValue,
count = largeValue - 1)
+ EventTimeStats(
+ max = largeValue, min = largeValue, avg = largeValue.toDouble, count =
largeValue - 1)
stats.add(largeValue)
stats.avg should be (largeValue.toDouble +- epsilon)
val stats2 = EventTimeStats(
max = largeValue + 1,
min = largeValue,
- avg = largeValue + 1,
+ avg = largeValue + 1.0,
count = largeValue)
stats.merge(stats2)
stats.avg should be ((largeValue + 0.5) +- epsilon)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index f77ca3e8fdb4..14e47b8f96fd 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -59,9 +59,9 @@ private[receiver] abstract class RateLimiter(conf: SparkConf)
extends Logging {
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
- rateLimiter.setRate(newRate.min(maxRateLimit))
+ rateLimiter.setRate(newRate.min(maxRateLimit).toDouble)
} else {
- rateLimiter.setRate(newRate)
+ rateLimiter.setRate(newRate.toDouble)
}
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 9b7014a6640d..bac82b5d331f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -206,8 +206,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
recordRateForAllStreams.data,
minBatchTime,
maxBatchTime,
- minRecordRate,
- maxRecordRate,
+ minRecordRate.toDouble,
+ maxRecordRate.toDouble,
"records/sec")
graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector)
@@ -218,7 +218,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
schedulingDelay.timelineData(normalizedUnit),
minBatchTime,
maxBatchTime,
- minTime,
+ minTime.toDouble,
maxTime,
formattedUnit)
graphUIDataForSchedulingDelay.generateDataJs(jsCollector)
@@ -230,7 +230,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
processingTime.timelineData(normalizedUnit),
minBatchTime,
maxBatchTime,
- minTime,
+ minTime.toDouble,
maxTime,
formattedUnit, Some(batchInterval))
graphUIDataForProcessingTime.generateDataJs(jsCollector)
@@ -242,7 +242,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
totalDelay.timelineData(normalizedUnit),
minBatchTime,
maxBatchTime,
- minTime,
+ minTime.toDouble,
maxTime,
formattedUnit)
graphUIDataForTotalDelay.generateDataJs(jsCollector)
@@ -294,7 +294,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
{if (hasStream) {
<tr id="inputs-table" style="display: none;" >
<td colspan="3">
- {generateInputDStreamsTable(jsCollector, minBatchTime,
maxBatchTime, minRecordRate)}
+ {generateInputDStreamsTable(jsCollector, minBatchTime,
maxBatchTime, minRecordRate.toDouble)}
</td>
</tr>
}}
@@ -350,7 +350,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val content: Seq[Node] =
listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap {
case (streamId, recordRates) =>
generateInputDStreamRow(
- jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
+ jsCollector, streamId, recordRates, minX, maxX, minY,
maxYCalculated.toDouble)
}
// scalastyle:off
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index 57ec162b0d17..3c5fc8a08e67 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -44,7 +44,7 @@ private[streaming] object UIUtils {
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
- return (milliseconds, TimeUnit.MILLISECONDS)
+ return (milliseconds.toDouble, TimeUnit.MILLISECONDS)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 60) {
@@ -67,9 +67,9 @@ private[streaming] object UIUtils {
* will discard the fractional part.
*/
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit
match {
- case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
- case TimeUnit.MICROSECONDS => milliseconds * 1000
- case TimeUnit.MILLISECONDS => milliseconds
+ case TimeUnit.NANOSECONDS => milliseconds.toDouble * 1000 * 1000
+ case TimeUnit.MICROSECONDS => milliseconds.toDouble * 1000
+ case TimeUnit.MILLISECONDS => milliseconds.toDouble
case TimeUnit.SECONDS => milliseconds / 1000.0
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 7d9dfb100f61..e0ca22ad77d1 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -119,13 +119,13 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
}
// Batch proc time = batch interval, should increase allocation by 1
- addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis.toDouble) {
verifyTotalRequestedExecs(Some(3)) // one already allocated, increase
allocation by 1
verifyScaledDownExec(None)
}
// Batch proc time = batch interval * 2, should increase allocation by 2
- addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
+ addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2.0) {
verifyTotalRequestedExecs(Some(4))
verifyScaledDownExec(None)
}
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
index b5a45fc317d0..a4faed150157 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
@@ -83,5 +83,5 @@ private[streaming] class ConstantEstimator(@volatile private
var rate: Long)
time: Long,
elements: Long,
processingDelay: Long,
- schedulingDelay: Long): Option[Double] = Some(rate)
+ schedulingDelay: Long): Option[Double] = Some(rate.toDouble)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]