This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a2a41b7 [SPARK-26978][CORE][SQL] Avoid magic time constants a2a41b7 is described below commit a2a41b7bf2bfdcd1cff242013716ac7bd84bdacd Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Tue Feb 26 09:08:12 2019 -0600 [SPARK-26978][CORE][SQL] Avoid magic time constants ## What changes were proposed in this pull request? In the PR, I propose to refactor existing code related to date/time conversions, and replace constants like `1000` and `1000000` by `DateTimeUtils` constants and transformation functions from `java.util.concurrent.TimeUnit._`. ## How was this patch tested? The changes are tested by existing test suites. Closes #23878 from MaxGekk/magic-time-constants. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../org/apache/spark/BarrierTaskContext.scala | 12 +-- .../org/apache/spark/deploy/master/Master.scala | 5 +- .../org/apache/spark/benchmark/Benchmark.scala | 4 +- .../spark/sql/catalyst/expressions/Cast.scala | 34 ++++---- .../expressions/codegen/CodeGenerator.scala | 6 +- .../catalyst/expressions/datetimeExpressions.scala | 8 +- .../spark/sql/catalyst/expressions/hash.scala | 6 +- .../sql/catalyst/optimizer/finishAnalysis.scala | 6 +- .../plans/logical/EventTimeWatermark.scala | 4 +- .../catalyst/rules/QueryExecutionMetering.scala | 4 +- .../spark/sql/catalyst/util/DateTimeUtils.scala | 90 ++++++++++++---------- .../apache/spark/sql/catalyst/util/package.scala | 20 ----- .../spark/sql/catalyst/expressions/CastSuite.scala | 25 +++--- .../expressions/DateExpressionsSuite.scala | 17 ++-- .../scala/org/apache/spark/sql/SparkSession.scala | 3 +- .../spark/sql/execution/DataSourceScanExec.scala | 10 ++- .../org/apache/spark/sql/execution/SortExec.scala | 7 +- .../execution/aggregate/HashAggregateExec.scala | 9 ++- .../aggregate/ObjectHashAggregateExec.scala | 4 +- .../sql/execution/basicPhysicalOperators.scala | 4 +- .../apache/spark/sql/execution/command/ddl.scala | 3 +- .../execution/exchange/BroadcastExchangeExec.scala | 6 +- .../sql/execution/joins/ShuffledHashJoinExec.scala | 4 +- .../streaming/EventTimeWatermarkExec.scala | 3 +- .../sql/execution/streaming/FileStreamSource.scala | 5 +- .../sql/execution/streaming/GroupStateImpl.scala | 3 +- .../sql/execution/streaming/ProgressReporter.scala | 8 +- .../streaming/continuous/ContinuousTrigger.scala | 2 +- .../spark/sql/streaming/ProcessingTime.scala | 2 +- .../org/apache/spark/sql/DateFunctionsSuite.scala | 47 ++++++----- .../streaming/sources/TextSocketStreamSuite.scala | 4 +- .../sql/streaming/EventTimeWatermarkSuite.scala | 9 ++- .../org/apache/spark/sql/hive/HiveInspectors.scala | 5 +- .../spark/sql/hive/client/HiveClientImpl.scala | 9 ++- .../apache/spark/sql/hive/client/HiveShim.scala | 2 +- .../streaming/util/RateLimitedOutputStream.scala | 2 +- 36 files changed, 210 insertions(+), 182 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 6a497af..2d842b9 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -109,8 +109,8 @@ class BarrierTaskContext private[spark] ( override def run(): Unit = { logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " + s"under the global sync since $startTime, has been waiting for " + - s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + - s"is $barrierEpoch.") + s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + + s"current barrier epoch is $barrierEpoch.") } } // Log the update of global sync every 60 seconds. @@ -126,14 +126,14 @@ class BarrierTaskContext private[spark] ( barrierEpoch += 1 logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " + "global sync successfully, waited for " + - s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch is " + - s"$barrierEpoch.") + s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + + s"current barrier epoch is $barrierEpoch.") } catch { case e: SparkException => logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) failed " + "to perform global sync, waited for " + - s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + - s"is $barrierEpoch.") + s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + + s"current barrier epoch is $barrierEpoch.") throw e } finally { timerTask.cancel() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b26da8a..3dd804b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -994,9 +994,10 @@ private[deploy] class Master( val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray for (worker <- toRemove) { if (worker.state != WorkerState.DEAD) { + val workerTimeoutSecs = TimeUnit.MILLISECONDS.toSeconds(workerTimeoutMs) logWarning("Removing %s because we got no heartbeat in %d seconds".format( - worker.id, workerTimeoutMs / 1000)) - removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 1000} seconds") + worker.id, workerTimeoutSecs)) + removeWorker(worker, s"Not receiving heartbeat for $workerTimeoutSecs seconds") } else { if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) { workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index bb389cd..df1ed28 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -148,13 +148,13 @@ private[spark] class Benchmark( if (outputPerIteration) { // scalastyle:off - println(s"Iteration $i took ${runTime / 1000} microseconds") + println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds") // scalastyle:on } i += 1 } // scalastyle:off - println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms") + println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms") // scalastyle:on val best = runTimes.min val avg = runTimes.sum / runTimes.size diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index b20249f..d591c58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.math.{BigDecimal => JavaBigDecimal} +import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow @@ -25,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.unsafe.types.UTF8String.{IntWrapper, LongWrapper} @@ -374,7 +376,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String case ByteType => buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000) + buildCast[Int](_, d => MILLISECONDS.toMicros(DateTimeUtils.daysToMillis(d, timeZone))) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -387,21 +389,21 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } private[this] def decimalToTimestamp(d: Decimal): Long = { - (d.toBigDecimal * 1000000L).longValue() + (d.toBigDecimal * MICROS_PER_SECOND).longValue() } private[this] def doubleToTimestamp(d: Double): Any = { - if (d.isNaN || d.isInfinite) null else (d * 1000000L).toLong + if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong } // converting seconds to us - private[this] def longToTimestamp(t: Long): Long = t * 1000000L + private[this] def longToTimestamp(t: Long): Long = SECONDS.toMicros(t) // converting us to seconds private[this] def timestampToLong(ts: Long): Long = { - Math.floorDiv(ts, DateTimeUtils.MICROS_PER_SECOND) + Math.floorDiv(ts, MICROS_PER_SECOND) } // converting us to seconds in double private[this] def timestampToDouble(ts: Long): Double = { - ts / 1000000.0 + ts / MICROS_PER_SECOND.toDouble } // DateConverter @@ -411,7 +413,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L, timeZone)) + buildCast[Long](_, t => DateTimeUtils.millisToDays(MICROSECONDS.toMillis(t), timeZone)) } // IntervalConverter @@ -927,7 +929,8 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) (c, evPrim, evNull) => code"""$evPrim = - org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c / 1000L, $tz);""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays( + $c / $MICROS_PER_MILLIS, $tz);""" case _ => (c, evPrim, evNull) => code"$evNull = true;" } @@ -1034,7 +1037,8 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) (c, evPrim, evNull) => code"""$evPrim = - org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis($c, $tz) * 1000;""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis( + $c, $tz) * $MICROS_PER_MILLIS;""" case DecimalType() => (c, evPrim, evNull) => code"$evPrim = ${decimalToTimestampCode(c)};" case DoubleType => @@ -1043,7 +1047,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String if (Double.isNaN($c) || Double.isInfinite($c)) { $evNull = true; } else { - $evPrim = (long)($c * 1000000L); + $evPrim = (long)($c * $MICROS_PER_SECOND); } """ case FloatType => @@ -1052,7 +1056,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String if (Float.isNaN($c) || Float.isInfinite($c)) { $evNull = true; } else { - $evPrim = (long)($c * 1000000L); + $evPrim = (long)($c * $MICROS_PER_SECOND); } """ } @@ -1069,14 +1073,14 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } private[this] def decimalToTimestampCode(d: ExprValue): Block = { - val block = inline"new java.math.BigDecimal(1000000L)" + val block = inline"new java.math.BigDecimal($MICROS_PER_SECOND)" code"($d.toBigDecimal().bigDecimal().multiply($block)).longValue()" } - private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * 1000000L" + private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * (long)$MICROS_PER_SECOND" private[this] def timestampToIntegerCode(ts: ExprValue): Block = - code"java.lang.Math.floorDiv($ts, 1000000L)" + code"java.lang.Math.floorDiv($ts, $MICROS_PER_SECOND)" private[this] def timestampToDoubleCode(ts: ExprValue): Block = - code"$ts / 1000000.0" + code"$ts / (double)$MICROS_PER_SECOND" private[this] def castToBooleanCode(from: DataType): CastFunction = from match { case StringType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7c8f7cd..b9365f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -32,7 +32,7 @@ import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler} import org.codehaus.janino.util.ClassFile -import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException} +import org.apache.spark.{TaskContext, TaskKilledException} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.CodegenMetrics @@ -40,10 +40,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types._ import org.apache.spark.util.{ParentClassLoader, Utils} @@ -1372,7 +1372,7 @@ object CodeGenerator extends Logging { val startTime = System.nanoTime() val result = doCompile(code) val endTime = System.nanoTime() - def timeMs: Double = (endTime - startTime).toDouble / 1000000 + def timeMs: Double = (endTime - startTime).toDouble / NANOS_PER_MILLIS CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) logInfo(s"Code generated in $timeMs ms") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index ec59502..4cb0031 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -694,7 +694,7 @@ abstract class UnixTime $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { try { - ${ev.value} = $formatterName.parse(${eval1.value}.toString()) / 1000000L; + ${ev.value} = $formatterName.parse(${eval1.value}.toString()) / $MICROS_PER_SECOND; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; } catch (java.text.ParseException e) { @@ -714,7 +714,7 @@ abstract class UnixTime s""" try { ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $tz, $locale) - .parse($string.toString()) / 1000000L; + .parse($string.toString()) / $MICROS_PER_SECOND; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; } catch (java.text.ParseException e) { @@ -733,7 +733,7 @@ abstract class UnixTime boolean ${ev.isNull} = ${eval1.isNull}; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { - ${ev.value} = ${eval1.value} / 1000000L; + ${ev.value} = ${eval1.value} / $MICROS_PER_SECOND; }""") case DateType => val tz = ctx.addReferenceObj("timeZone", timeZone) @@ -744,7 +744,7 @@ abstract class UnixTime boolean ${ev.isNull} = ${eval1.isNull}; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { - ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / 1000L; + ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / $MILLIS_PER_SECOND; }""") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 742a4f8..8d17b07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.math.{BigDecimal, RoundingMode} import java.security.{MessageDigest, NoSuchAlgorithmException} +import java.util.concurrent.TimeUnit._ import java.util.zip.CRC32 import scala.annotation.tailrec @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 @@ -863,8 +865,8 @@ object HiveHashFunction extends InterpretedHashFunction { * Mimics TimestampWritable.hashCode() in Hive */ def hashTimestamp(timestamp: Long): Long = { - val timestampInSeconds = timestamp / 1000000 - val nanoSecondsPortion = (timestamp % 1000000) * 1000 + val timestampInSeconds = MICROSECONDS.toSeconds(timestamp) + val nanoSecondsPortion = (timestamp % MICROS_PER_SECOND) * NANOS_PER_MICROS var result = timestampInSeconds result <<= 30 // the nanosecond part fits in 30 bits diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index fe196ec..4094864 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import java.util.concurrent.TimeUnit._ + import scala.collection.mutable import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -65,7 +67,9 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { case CurrentDate(Some(timeZoneId)) => currentDates.getOrElseUpdate(timeZoneId, { Literal.create( - DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)), + DateTimeUtils.millisToDays( + MICROSECONDS.toMillis(timestamp), + DateTimeUtils.getTimeZone(timeZoneId)), DateType) }) case CurrentTimestamp() => currentTime diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 7a927e1..8441c2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical +import java.util.concurrent.TimeUnit + import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -27,7 +29,7 @@ object EventTimeWatermark { def getDelayMs(delay: CalendarInterval): Long = { // We define month as `31 days` to simplify calculation. - val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 delay.milliseconds + delay.months * millisPerMonth } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala index 62f7541..e4d5fa9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala @@ -21,6 +21,8 @@ import scala.collection.JavaConverters._ import com.google.common.util.concurrent.AtomicLongMap +import org.apache.spark.sql.catalyst.util.DateTimeUtils.NANOS_PER_SECOND + case class QueryExecutionMetering() { private val timeMap = AtomicLongMap.create[String]() private val numRunsMap = AtomicLongMap.create[String]() @@ -82,7 +84,7 @@ case class QueryExecutionMetering() { s""" |=== Metrics of Analyzer/Optimizer Rules === |Total number of runs: $totalNumRuns - |Total time: ${totalTime / 1000000000D} seconds + |Total time: ${totalTime / NANOS_PER_SECOND.toDouble} seconds | |$colRuleName $colRunTime $colNumRuns |$ruleMetrics diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index d714d29..627670a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -22,7 +22,7 @@ import java.time._ import java.time.Year.isLeap import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} -import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal @@ -44,14 +44,18 @@ object DateTimeUtils { // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian // it's 2440587.5, rounding up to compatible with Hive final val JULIAN_DAY_OF_EPOCH = 2440588 - final val SECONDS_PER_DAY = 60 * 60 * 24L - final val MICROS_PER_MILLIS = 1000L - final val MICROS_PER_SECOND = MICROS_PER_MILLIS * MILLIS_PER_SECOND - final val MILLIS_PER_SECOND = 1000L - final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L - final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY - final val NANOS_PER_MICROS = 1000L - final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L + + final val NANOS_PER_MICROS = MICROSECONDS.toNanos(1) + final val NANOS_PER_MILLIS = MILLISECONDS.toNanos(1) + final val NANOS_PER_SECOND = SECONDS.toNanos(1) + final val MICROS_PER_MILLIS = MILLISECONDS.toMicros(1) + final val MICROS_PER_SECOND = SECONDS.toMicros(1) + final val MICROS_PER_DAY = DAYS.toMicros(1) + final val MILLIS_PER_SECOND = SECONDS.toMillis(1) + final val MILLIS_PER_MINUTE = MINUTES.toMillis(1) + final val MILLIS_PER_HOUR = HOURS.toMillis(1) + final val MILLIS_PER_DAY = DAYS.toMillis(1) + final val SECONDS_PER_DAY = DAYS.toSeconds(1) // number of days between 1.1.1970 and 1.1.2001 final val to2001 = -11323 @@ -133,8 +137,8 @@ object DateTimeUtils { micros += MICROS_PER_SECOND seconds -= 1 } - val t = new Timestamp(seconds * 1000) - t.setNanos(micros.toInt * 1000) + val t = new Timestamp(SECONDS.toMillis(seconds)) + t.setNanos(MICROSECONDS.toNanos(micros).toInt) t } @@ -143,7 +147,7 @@ object DateTimeUtils { */ def fromJavaTimestamp(t: Timestamp): SQLTimestamp = { if (t != null) { - t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L + MILLISECONDS.toMicros(t.getTime()) + NANOSECONDS.toMicros(t.getNanos()) % NANOS_PER_MICROS } else { 0L } @@ -156,7 +160,7 @@ object DateTimeUtils { def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = { // use Long to avoid rounding errors val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - seconds * MICROS_PER_SECOND + nanoseconds / 1000L + SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds) } /** @@ -168,7 +172,7 @@ object DateTimeUtils { val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY val day = julian_us / MICROS_PER_DAY val micros = julian_us % MICROS_PER_DAY - (day.toInt, micros * 1000L) + (day.toInt, MICROSECONDS.toNanos(micros)) } /* @@ -186,7 +190,7 @@ object DateTimeUtils { * Converts milliseconds since epoch to SQLTimestamp. */ def fromMillis(millis: Long): SQLTimestamp = { - millis * MICROS_PER_MILLIS + MILLISECONDS.toMicros(millis) } /** @@ -329,7 +333,7 @@ object DateTimeUtils { val sign = if (tz.get.toChar == '-') -1 else 1 ZoneId.ofOffset("GMT", ZoneOffset.ofHoursMinutes(sign * segments(7), sign * segments(8))) } - val nanoseconds = TimeUnit.MICROSECONDS.toNanos(segments(6)) + val nanoseconds = MICROSECONDS.toNanos(segments(6)) val localTime = LocalTime.of(segments(3), segments(4), segments(5), nanoseconds.toInt) val localDate = if (justTime) { LocalDate.now(zoneId) @@ -346,8 +350,8 @@ object DateTimeUtils { } def instantToMicros(instant: Instant): Long = { - val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) - val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS) + val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) + val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano)) result } @@ -420,14 +424,15 @@ object DateTimeUtils { } private def localTimestamp(microsec: SQLTimestamp, timeZone: TimeZone): SQLTimestamp = { - absoluteMicroSecond(microsec) + timeZone.getOffset(microsec / 1000) * 1000L + val zoneOffsetUs = MILLISECONDS.toMicros(timeZone.getOffset(MICROSECONDS.toMillis(microsec))) + absoluteMicroSecond(microsec) + zoneOffsetUs } /** * Returns the hour value of a given timestamp value. The timestamp is expressed in microseconds. */ def getHours(microsec: SQLTimestamp, timeZone: TimeZone): Int = { - ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND / 3600) % 24).toInt + (MICROSECONDS.toHours(localTimestamp(microsec, timeZone)) % 24).toInt } /** @@ -435,7 +440,7 @@ object DateTimeUtils { * microseconds. */ def getMinutes(microsec: SQLTimestamp, timeZone: TimeZone): Int = { - ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND / 60) % 60).toInt + (MICROSECONDS.toMinutes(localTimestamp(microsec, timeZone)) % 60).toInt } /** @@ -443,7 +448,7 @@ object DateTimeUtils { * microseconds. */ def getSeconds(microsec: SQLTimestamp, timeZone: TimeZone): Int = { - ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND) % 60).toInt + (MICROSECONDS.toSeconds(localTimestamp(microsec, timeZone)) % 60).toInt } /** @@ -560,10 +565,10 @@ object DateTimeUtils { months: Int, microseconds: Long, timeZone: TimeZone): SQLTimestamp = { - val days = millisToDays(start / 1000L, timeZone) + val days = millisToDays(MICROSECONDS.toMillis(start), timeZone) val newDays = dateAddMonths(days, months) start + - daysToMillis(newDays, timeZone) * 1000L - daysToMillis(days, timeZone) * 1000L + + MILLISECONDS.toMicros(daysToMillis(newDays, timeZone) - daysToMillis(days, timeZone)) + microseconds } @@ -582,8 +587,8 @@ object DateTimeUtils { time2: SQLTimestamp, roundOff: Boolean, timeZone: TimeZone): Double = { - val millis1 = time1 / 1000L - val millis2 = time2 / 1000L + val millis1 = MICROSECONDS.toMillis(time1) + val millis2 = MICROSECONDS.toMillis(time2) val date1 = millisToDays(millis1, timeZone) val date2 = millisToDays(millis2, timeZone) val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1) @@ -599,12 +604,11 @@ object DateTimeUtils { } // using milliseconds can cause precision loss with more than 8 digits // we follow Hive's implementation which uses seconds - val secondsInDay1 = (millis1 - daysToMillis(date1, timeZone)) / 1000L - val secondsInDay2 = (millis2 - daysToMillis(date2, timeZone)) / 1000L + val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, timeZone)) + val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, timeZone)) val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + secondsInDay1 - secondsInDay2 - // 2678400D is the number of seconds in 31 days - // every month is considered to be 31 days long in this function - val diff = monthDiff + secondsDiff / 2678400D + val secondsInMonth = DAYS.toSeconds(31) + val diff = monthDiff + secondsDiff / secondsInMonth.toDouble if (roundOff) { // rounding to 8 digits math.round(diff * 1e8) / 1e8 @@ -688,7 +692,7 @@ object DateTimeUtils { * Trunc level should be generated using `parseTruncLevel()`, should be between 1 and 8 */ def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone): SQLTimestamp = { - var millis = t / MICROS_PER_MILLIS + var millis = MICROSECONDS.toMillis(t) val truncated = level match { case TRUNC_TO_YEAR => val dDays = millisToDays(millis, timeZone) @@ -699,13 +703,13 @@ object DateTimeUtils { case TRUNC_TO_DAY => val offset = timeZone.getOffset(millis) millis += offset - millis - millis % (MILLIS_PER_SECOND * SECONDS_PER_DAY) - offset + millis - millis % MILLIS_PER_DAY - offset case TRUNC_TO_HOUR => val offset = timeZone.getOffset(millis) millis += offset - millis - millis % (60 * 60 * MILLIS_PER_SECOND) - offset + millis - millis % MILLIS_PER_HOUR - offset case TRUNC_TO_MINUTE => - millis - millis % (60 * MILLIS_PER_SECOND) + millis - millis % MILLIS_PER_MINUTE case TRUNC_TO_SECOND => millis - millis % MILLIS_PER_SECOND case TRUNC_TO_WEEK => @@ -761,8 +765,8 @@ object DateTimeUtils { if (guess != offset) { // fallback to do the reverse lookup using java.time.LocalDateTime // this should only happen near the start or end of DST - val localDate = LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(millisLocal)) - val localTime = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos( + val localDate = LocalDate.ofEpochDay(MILLISECONDS.toDays(millisLocal)) + val localTime = LocalTime.ofNanoOfDay(MILLISECONDS.toNanos( Math.floorMod(millisLocal, MILLIS_PER_DAY))) val localDateTime = LocalDateTime.of(localDate, localTime) val millisEpoch = localDateTime.atZone(tz.toZoneId).toInstant.toEpochMilli @@ -787,15 +791,19 @@ object DateTimeUtils { ts } else { // get the human time using local time zone, that actually is in fromZone. - val localTs = ts + localZone.getOffset(ts / 1000L) * 1000L // in fromZone - localTs - getOffsetFromLocalMillis(localTs / 1000L, fromZone) * 1000L + val localZoneOffsetMs = localZone.getOffset(MICROSECONDS.toMillis(ts)) + val localTsUs = ts + MILLISECONDS.toMicros(localZoneOffsetMs) // in fromZone + val offsetFromLocalMs = getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), fromZone) + localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs) } if (toZone.getID == localZone.getID) { utcTs } else { - val localTs = utcTs + toZone.getOffset(utcTs / 1000L) * 1000L // in toZone + val toZoneOffsetMs = toZone.getOffset(MICROSECONDS.toMillis(utcTs)) + val localTsUs = utcTs + MILLISECONDS.toMicros(toZoneOffsetMs) // in toZone // treat it as local timezone, convert to UTC (we could get the expected human time back) - localTs - getOffsetFromLocalMillis(localTs / 1000L, localZone) * 1000L + val offsetFromLocalMs = getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), localZone) + localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 7f5860e..a5dbc75 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -125,18 +125,6 @@ package object util extends Logging { new String(out.toByteArray, StandardCharsets.UTF_8) } - def stringOrNull(a: AnyRef): String = if (a == null) null else a.toString - - def benchmark[A](f: => A): A = { - val startTime = System.nanoTime() - val ret = f - val endTime = System.nanoTime() - // scalastyle:off println - println(s"${(endTime - startTime).toDouble / 1000000}ms") - // scalastyle:on println - ret - } - // Replaces attributes, string literals, complex type extractors with their pretty form so that // generated column names don't contain back-ticks or double-quotes. def usePrettyExpression(e: Expression): Expression = e transform { @@ -158,7 +146,6 @@ package object util extends Logging { def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql - def escapeSingleQuotedString(str: String): String = { val builder = StringBuilder.newBuilder @@ -203,11 +190,4 @@ package object util extends Logging { def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = { truncatedString(seq, "", sep, "", maxFields) } - - /* FIX ME - implicit class debugLogging(a: Any) { - def debugLogging() { - org.apache.log4j.Logger.getLogger(a.getClass.getName).setLevel(org.apache.log4j.Level.DEBUG) - } - } */ } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 11956e1..d812504 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -18,9 +18,8 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Date, Timestamp} -import java.util.{Calendar, Locale, TimeZone} - -import scala.util.Random +import java.util.{Calendar, TimeZone} +import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row @@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion.numericPrecedence import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -321,13 +320,13 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId), TimestampType, timeZoneId), - c.getTimeInMillis * 1000) + MILLISECONDS.toMicros(c.getTimeInMillis)) c = Calendar.getInstance(TimeZoneGMT) c.set(2015, 10, 1, 2, 30, 0) checkEvaluation( cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId), TimestampType, timeZoneId), - c.getTimeInMillis * 1000) + MILLISECONDS.toMicros(c.getTimeInMillis)) } val gmtId = Option("GMT") @@ -522,17 +521,17 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(ts, FloatType), 15.003f) checkEvaluation(cast(ts, DoubleType), 15.003) checkEvaluation(cast(cast(tss, ShortType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * 1000) + DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) checkEvaluation(cast(cast(tss, IntegerType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * 1000) + DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) checkEvaluation(cast(cast(tss, LongType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * 1000) + DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) checkEvaluation( - cast(cast(millis.toFloat / 1000, TimestampType), FloatType), - millis.toFloat / 1000) + cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType), + millis.toFloat / MILLIS_PER_SECOND) checkEvaluation( - cast(cast(millis.toDouble / 1000, TimestampType), DoubleType), - millis.toDouble / 1000) + cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), DoubleType), + millis.toDouble / MILLIS_PER_SECOND) checkEvaluation( cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT), Decimal(1)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index ce576ec..8bec32d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -705,14 +706,14 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { 1000L) checkEvaluation( UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L) + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) checkEvaluation( UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), -1000L) checkEvaluation(UnixTimestamp( Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - DateTimeUtils.daysToMillis( - DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) / 1000L) + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) val t1 = UnixTimestamp( CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] val t2 = UnixTimestamp( @@ -727,7 +728,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { null) checkEvaluation( UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId), - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L) + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) checkEvaluation( UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) } @@ -759,14 +760,14 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { 1000L) checkEvaluation( ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L) + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) checkEvaluation( ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), -1000L) checkEvaluation(ToUnixTimestamp( Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - DateTimeUtils.daysToMillis( - DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) / 1000L) + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) val t1 = ToUnixTimestamp( CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] val t2 = ToUnixTimestamp( @@ -780,7 +781,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { null) checkEvaluation(ToUnixTimestamp( Literal(date1), Literal.create(null, StringType), timeZoneId), - DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L) + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) checkEvaluation( ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a7bd2ef..f6fab76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.Closeable +import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ @@ -690,7 +691,7 @@ class SparkSession private( val ret = f val end = System.nanoTime() // scalastyle:off println - println(s"Time taken: ${(end - start) / 1000 / 1000} ms") + println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms") // scalastyle:on println ret } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f852a52..3aed2ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -17,17 +17,18 @@ package org.apache.spark.sql.execution -import scala.collection.mutable.{ArrayBuffer, HashMap} +import java.util.concurrent.TimeUnit._ + +import scala.collection.mutable.HashMap import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString @@ -180,7 +181,8 @@ case class FileSourceScanExec( val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum - val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 + val timeTakenMs = NANOSECONDS.toMillis( + (System.nanoTime() - startTime) + optimizerMetadataTimeNs) driverMetrics("metadataTime") = timeTakenMs ret } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index f1470e4..0a955d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.util.concurrent.TimeUnit._ + import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD @@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -106,7 +109,7 @@ case class SortExec( // figure out how many bytes we spilled for this operator. val spillSizeBefore = metrics.memoryBytesSpilled val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) - sortTime += sorter.getSortTimeNanos / 1000000 + sortTime += NANOSECONDS.toMillis(sorter.getSortTimeNanos) peakMemory += sorter.getPeakMemoryUsage spillSize += metrics.memoryBytesSpilled - spillSizeBefore metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage) @@ -157,7 +160,7 @@ case class SortExec( | long $spillSizeBefore = $metrics.memoryBytesSpilled(); | $addToSorterFuncName(); | $sortedIterator = $sorterVariable.sort(); - | $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000); + | $sortTime.add($sorterVariable.getSortTimeNanos() / $NANOS_PER_MILLIS); | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 23ae1f0..25ff658 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.aggregate +import java.util.concurrent.TimeUnit._ + import org.apache.spark.TaskContext import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.rdd.RDD @@ -28,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -135,7 +138,7 @@ case class HashAggregateExec( aggregationIterator } } - aggTime += (System.nanoTime() - beforeAgg) / 1000000 + aggTime += NANOSECONDS.toMillis(System.nanoTime() - beforeAgg) res } } @@ -240,7 +243,7 @@ case class HashAggregateExec( | $initAgg = true; | long $beforeAgg = System.nanoTime(); | $doAggFuncName(); - | $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000); + | $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS); | | // output the result | ${genResult.trim} @@ -726,7 +729,7 @@ case class HashAggregateExec( $initAgg = true; long $beforeAgg = System.nanoTime(); $doAggFuncName(); - $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000); + $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS); } // output the result diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 5b340ee..151da24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.aggregate +import java.util.concurrent.TimeUnit._ + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ @@ -132,7 +134,7 @@ case class ObjectHashAggregateExec( aggregationIterator } } - aggTime += (System.nanoTime() - beforeAgg) / 1000000 + aggTime += NANOSECONDS.toMillis(System.nanoTime() - beforeAgg) res } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 4352721..eacd35b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.util.concurrent.TimeUnit._ + import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration @@ -684,7 +686,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { // Note that we use .executeCollect() because we don't want to convert data to Scala types val rows: Array[InternalRow] = child.executeCollect() val beforeBuild = System.nanoTime() - longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000 + longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - beforeCollect) val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum longMetric("dataSize") += dataSize diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 096481f..bcd8908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import java.util.Locale +import java.util.concurrent.TimeUnit._ import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport @@ -739,7 +740,7 @@ case class AlterTableRecoverPartitionsCommand( // do this in parallel. val batchSize = 100 partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch => - val now = System.currentTimeMillis() / 1000 + val now = MILLISECONDS.toSeconds(System.currentTimeMillis()) val parts = batch.map { case (spec, location) => val params = partitionStats.get(location.toString).map { case PartitionStatistics(numFiles, totalSize) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index d55d4fa..b9972b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -83,7 +83,7 @@ case class BroadcastExchangeExec( } val beforeBuild = System.nanoTime() - longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000 + longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - beforeCollect) // Construct the relation. val relation = mode.transform(input, Some(numRows)) @@ -105,11 +105,11 @@ case class BroadcastExchangeExec( } val beforeBroadcast = System.nanoTime() - longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000 + longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast - beforeBuild) // Broadcast the relation val broadcasted = sparkContext.broadcast(relation) - longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000 + longMetric("broadcastTime") += NANOSECONDS.toMillis(System.nanoTime() - beforeBroadcast) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) broadcasted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 524804d..a8361fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.joins +import java.util.concurrent.TimeUnit._ + import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -53,7 +55,7 @@ case class ShuffledHashJoinExec( val start = System.nanoTime() val context = TaskContext.get() val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) - buildTime += (System.nanoTime() - start) / 1000000 + buildTime += NANOSECONDS.toMillis(System.nanoTime() - start) buildDataSize += relation.estimatedSize // This relation is usually used until the end of task. context.addTaskCompletionListener[Unit](_ => relation.close()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 6fa7ee0..6d1131e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -99,7 +100,7 @@ case class EventTimeWatermarkExec( child.execute().mapPartitions { iter => val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) iter.map { row => - eventTimeStats.add(getEventTime(row).getLong(0) / 1000) + eventTimeStats.add(getEventTime(row).getLong(0) / MICROS_PER_MILLIS) row } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 43b70ae..cef814b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.net.URI - -import scala.collection.JavaConverters._ +import java.util.concurrent.TimeUnit._ import org.apache.hadoop.fs.{FileStatus, Path} @@ -237,7 +236,7 @@ class FileStreamSource( (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime - val listingTimeMs = (endTime.toDouble - startTime) / 1000000 + val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) if (listingTimeMs > 2000) { // Output a warning when listing files uses more than 2 seconds. logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index 7f65e3e..fcb230b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.sql.Date +import java.util.concurrent.TimeUnit import org.apache.commons.lang3.StringUtils @@ -178,7 +179,7 @@ private[sql] class GroupStateImpl[S] private( throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } - val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 cal.milliseconds + cal.months * millisPerMonth } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2528351..859c327 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress} import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream @@ -87,7 +87,7 @@ trait ProgressReporter extends Logging { private var lastNoDataProgressEventTime = Long.MinValue private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 - timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + timestampFormat.setTimeZone(getTimeZone("UTC")) @volatile protected var currentStatus: StreamingQueryStatus = { @@ -147,10 +147,10 @@ trait ProgressReporter extends Logging { val executionStats = extractExecutionStats(hasNewData) val processingTimeSec = - (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000 + (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { - (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 1000 + (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND } else { Double.NaN } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala index caffcc3..fd0ff31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala @@ -53,7 +53,7 @@ private[sql] object ContinuousTrigger { if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } - new ContinuousTrigger(cal.microseconds / 1000) + new ContinuousTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds)) } def apply(interval: Duration): ContinuousTrigger = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 236bd55..38b0776 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -91,7 +91,7 @@ object ProcessingTime { if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } - new ProcessingTime(cal.microseconds / 1000) + new ProcessingTime(TimeUnit.MICROSECONDS.toMillis(cal.microseconds)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 62bb72d..b06d52d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale +import java.util.concurrent.TimeUnit import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ @@ -515,6 +516,8 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) } + private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) + test("unix_timestamp") { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") @@ -527,21 +530,21 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val fmt = "yyyy/MM/dd HH:mm:ss.S" val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") checkAnswer(df.select(unix_timestamp(col("ts"))), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.select(unix_timestamp(col("ss"))), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq( - Row(date1.getTime / 1000L), Row(date2.getTime / 1000L))) + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq( - Row(date1.getTime / 1000L), Row(date2.getTime / 1000L))) + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) val x1 = "2015-07-24 10:00:00" val x2 = "2015-25-07 02:02:02" @@ -552,13 +555,13 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val df1 = Seq(x1, x2, x3, x4).toDF("x") checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( - Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq( - Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), Seq( - Row(null), Row(ts2.getTime / 1000L), Row(null), Row(null))) + Row(null), Row(secs(ts2.getTime)), Row(null), Row(null))) checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( - Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L), Row(null))) + Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) // invalid format checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), Seq( @@ -570,10 +573,12 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") val df2 = Seq(y1, y2).toDF("y") checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( - Row(ts5.getTime / 1000L), Row(null))) + Row(secs(ts5.getTime)), Row(null))) val now = sql("select unix_timestamp()").collect().head.getLong(0) - checkAnswer(sql(s"select cast ($now as timestamp)"), Row(new java.util.Date(now * 1000))) + checkAnswer( + sql(s"select cast ($now as timestamp)"), + Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now)))) } test("to_unix_timestamp") { @@ -588,13 +593,13 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val fmt = "yyyy/MM/dd HH:mm:ss.S" val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq( - Row(date1.getTime / 1000L), Row(date2.getTime / 1000L))) + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( - Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) val x1 = "2015-07-24 10:00:00" val x2 = "2015-25-07 02:02:02" @@ -605,9 +610,9 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val df1 = Seq(x1, x2, x3, x4).toDF("x") checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq( - Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( - Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L), Row(null))) + Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) // february val y1 = "2016-02-29" @@ -615,7 +620,7 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") val df2 = Seq(y1, y2).toDF("y") checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( - Row(ts5.getTime / 1000L), Row(null))) + Row(secs(ts5.getTime)), Row(null))) // invalid format checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')"), Seq( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index 33c65d7..e1769fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import java.nio.channels.ServerSocketChannel import java.sql.Timestamp import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit._ import scala.collection.JavaConverters._ @@ -29,7 +30,6 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ @@ -168,7 +168,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before // Timestamp for rate stream is round to second which leads to milliseconds lost, that will // make batch1stamp smaller than current timestamp if both of them are in the same second. // Comparing by second to make sure the correct behavior. - assert(batch1Stamp.getTime >= curr / 1000 * 1000) + assert(batch1Stamp.getTime >= SECONDS.toMillis(MILLISECONDS.toSeconds(curr))) assert(!batch2Stamp.before(batch1Stamp)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index b79770a..1ff9dec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -21,6 +21,7 @@ import java.{util => ju} import java.io.File import java.text.SimpleDateFormat import java.util.{Calendar, Date, Locale} +import java.util.concurrent.TimeUnit._ import org.apache.commons.io.FileUtils import org.scalatest.{BeforeAndAfter, Matchers} @@ -28,7 +29,6 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Dataset} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.internal.SQLConf @@ -347,12 +347,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } testStream(aggWithWatermark)( - AddData(input, currentTimeMs / 1000), + AddData(input, MILLISECONDS.toSeconds(currentTimeMs)), CheckAnswer(), - AddData(input, currentTimeMs / 1000), + AddData(input, MILLISECONDS.toSeconds(currentTimeMs)), CheckAnswer(), assertEventStats { e => - assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) + assert(timestampFormat.parse(e.get("max")).getTime === + SECONDS.toMillis(MILLISECONDS.toSeconds((currentTimeMs)))) val watermarkTime = timestampFormat.parse(e.get("watermark")) val monthDiff = monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) // monthsSinceEpoch is like `math.floor(num)`, so monthDiff has two possible values. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 4dec2f7..178fced 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.lang.reflect.{ParameterizedType, Type, WildcardType} +import java.util.concurrent.TimeUnit._ import scala.collection.JavaConverters._ @@ -460,7 +461,7 @@ private[hive] trait HiveInspectors { _ => constant case poi: WritableConstantTimestampObjectInspector => val t = poi.getWritableConstantValue - val constant = t.getSeconds * 1000000L + t.getNanos / 1000L + val constant = SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos) _ => constant case poi: WritableConstantIntObjectInspector => val constant = poi.getWritableConstantValue.get() @@ -629,7 +630,7 @@ private[hive] trait HiveInspectors { data: Any => { if (data != null) { val t = x.getPrimitiveWritableObject(data) - t.getSeconds * 1000000L + t.getNanos / 1000L + SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos) } else { null } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bfe19c2..77ac606 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.lang.{Iterable => JIterable} import java.util.{Locale, Map => JMap} +import java.util.concurrent.TimeUnit._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -948,8 +949,8 @@ private[hive] object HiveClientImpl { hiveTable.setFields(schema.asJava) hiveTable.setPartCols(partCols.asJava) userName.foreach(hiveTable.setOwner) - hiveTable.setCreateTime((table.createTime / 1000).toInt) - hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) + hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt) + hiveTable.setLastAccessTime(MILLISECONDS.toSeconds(table.lastAccessTime).toInt) table.storage.locationUri.map(CatalogUtils.URIToString).foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)} table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) @@ -1012,8 +1013,8 @@ private[hive] object HiveClientImpl { tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) - tpart.setCreateTime((p.createTime / 1000).toInt) - tpart.setLastAccessTime((p.lastAccessTime / 1000).toInt) + tpart.setCreateTime(MILLISECONDS.toSeconds(p.createTime).toInt) + tpart.setLastAccessTime(MILLISECONDS.toSeconds(p.lastAccessTime).toInt) tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava) new HivePartition(ht, tpart) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index a8ebb23..af5ea59 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -516,7 +516,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { f.className, null, PrincipalType.USER, - (System.currentTimeMillis / 1000).toInt, + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt, FunctionType.JAVA, resourceUris.asJava) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala index 29cc1fa..342f20f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -79,7 +79,7 @@ class RateLimitedOutputStream(out: OutputStream, desiredBytesPerSec: Int) } else { // Calculate how much time we should sleep to bring ourselves to the desired rate. val targetTimeInMillis = bytesWrittenSinceSync * 1000 / desiredBytesPerSec - val elapsedTimeInMillis = elapsedNanosecs / 1000000 + val elapsedTimeInMillis = NANOSECONDS.toMillis(elapsedNanosecs) val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis if (sleepTimeInMillis > 0) { logTrace("Natural rate is " + rate + " per second but desired rate is " + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org