This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 633de74 [SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0 633de74 is described below commit 633de74b60f2399a68dc6aa2c161dbb5568679e8 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Tue Feb 19 11:46:42 2019 +0800 [SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0 ## What changes were proposed in this pull request? - Backport of #23662 to `branch-2.4` - Added `Timestamp`/`DateFormatter` - Set version of column stats to `1` to keep backward compatibility with previous versions ## How was this patch tested? The changes were tested by `StatisticsCollectionSuite` and by `StatisticsSuite`. Closes #23809 from MaxGekk/column-stats-time-date-2.4. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/catalog/interface.scala | 32 ++++-- .../sql/catalyst/plans/logical/Statistics.scala | 7 +- .../spark/sql/catalyst/util/DateFormatter.scala | 62 +++++++++++ .../catalyst/util/DateTimeFormatterHelper.scala | 78 ++++++++++++++ .../spark/sql/catalyst/util/DateTimeUtils.scala | 15 ++- .../sql/catalyst/util/TimestampFormatter.scala | 87 +++++++++++++++ .../spark/sql/catalyst/plans/SQLHelper.scala | 64 +++++++++++ .../sql/catalyst/util/DateTimeTestUtils.scala | 11 ++ .../apache/spark/sql/util/DateFormatterSuite.scala | 98 +++++++++++++++++ .../spark/sql/util/TimestampFormatterSuite.scala | 120 +++++++++++++++++++++ 10 files changed, 561 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 30ded13..6453264 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -413,7 +413,8 @@ case class CatalogColumnStat( nullCount: Option[BigInt] = None, avgLen: Option[Long] = None, maxLen: Option[Long] = None, - histogram: Option[Histogram] = None) { + histogram: Option[Histogram] = None, + version: Int = CatalogColumnStat.VERSION) { /** * Returns a map from string to string that can be used to serialize the column stats. @@ -427,7 +428,7 @@ case class CatalogColumnStat( */ def toMap(colName: String): Map[String, String] = { val map = new scala.collection.mutable.HashMap[String, String] - map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") + map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", CatalogColumnStat.VERSION.toString) distinctCount.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) } @@ -450,12 +451,13 @@ case class CatalogColumnStat( dataType: DataType): ColumnStat = ColumnStat( distinctCount = distinctCount, - min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)), - max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)), + min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)), + max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)), nullCount = nullCount, avgLen = avgLen, maxLen = maxLen, - histogram = histogram) + histogram = histogram, + version = version) } object CatalogColumnStat extends Logging { @@ -470,14 +472,23 @@ object CatalogColumnStat extends Logging { private val KEY_MAX_LEN = "maxLen" private val KEY_HISTOGRAM = "histogram" + val VERSION = 1 + + private def getTimestampFormatter(): TimestampFormatter = { + TimestampFormatter(format = "yyyy-MM-dd HH:mm:ss.SSSSSS", timeZone = DateTimeUtils.TimeZoneUTC) + } + /** * Converts from string representation of data type to the corresponding Catalyst data type. */ - def fromExternalString(s: String, name: String, dataType: DataType): Any = { + def fromExternalString(s: String, name: String, dataType: DataType, version: Int): Any = { dataType match { case BooleanType => s.toBoolean - case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s)) - case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s)) + case DateType if version == 1 => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s)) + case DateType => DateFormatter().parse(s) + case TimestampType if version == 1 => + DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s)) + case TimestampType => getTimestampFormatter().parse(s) case ByteType => s.toByte case ShortType => s.toShort case IntegerType => s.toInt @@ -530,7 +541,8 @@ object CatalogColumnStat extends Logging { nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)), avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong), maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong), - histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize) + histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize), + version = map(s"${colName}.${KEY_VERSION}").toInt )) } catch { case NonFatal(e) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index b3a4886..d0ca9eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -93,6 +93,7 @@ case class Statistics( * @param avgLen average length of the values. For fixed-length types, this should be a constant. * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. * @param histogram histogram of the values + * @param version version of statistics saved to or retrieved from the catalog */ case class ColumnStat( distinctCount: Option[BigInt] = None, @@ -101,7 +102,8 @@ case class ColumnStat( nullCount: Option[BigInt] = None, avgLen: Option[Long] = None, maxLen: Option[Long] = None, - histogram: Option[Histogram] = None) { + histogram: Option[Histogram] = None, + version: Int = CatalogColumnStat.VERSION) { // Are distinctCount and nullCount statistics defined? val hasCountStats = distinctCount.isDefined && nullCount.isDefined @@ -120,7 +122,8 @@ case class ColumnStat( nullCount = nullCount, avgLen = avgLen, maxLen = maxLen, - histogram = histogram) + histogram = histogram, + version = version) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala new file mode 100644 index 0000000..9535a36 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time.{Instant, ZoneId} +import java.util.Locale + +import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToDays + +sealed trait DateFormatter extends Serializable { + def parse(s: String): Int // returns days since epoch + def format(days: Int): String +} + +class Iso8601DateFormatter( + pattern: String, + locale: Locale) extends DateFormatter with DateTimeFormatterHelper { + + @transient + private lazy val formatter = getOrCreateFormatter(pattern, locale) + private val UTC = ZoneId.of("UTC") + + private def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + toInstantWithZoneId(temporalAccessor, UTC) + } + + override def parse(s: String): Int = instantToDays(toInstant(s)) + + override def format(days: Int): String = { + val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) + formatter.withZone(UTC).format(instant) + } +} + +object DateFormatter { + val defaultPattern: String = "yyyy-MM-dd" + val defaultLocale: Locale = Locale.US + + def apply(format: String, locale: Locale): DateFormatter = { + new Iso8601DateFormatter(format, locale) + } + + def apply(format: String): DateFormatter = apply(format, defaultLocale) + + def apply(): DateFormatter = apply(defaultPattern) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala new file mode 100644 index 0000000..81ad6ad --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time._ +import java.time.chrono.IsoChronology +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} +import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} +import java.util.Locale + +import com.google.common.cache.CacheBuilder + +import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._ + +trait DateTimeFormatterHelper { + protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor, zoneId: ZoneId): Instant = { + val localTime = if (temporalAccessor.query(TemporalQueries.localTime) == null) { + LocalTime.ofNanoOfDay(0) + } else { + LocalTime.from(temporalAccessor) + } + val localDate = LocalDate.from(temporalAccessor) + val localDateTime = LocalDateTime.of(localDate, localTime) + val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId) + Instant.from(zonedDateTime) + } + + // Gets a formatter from the cache or creates new one. The buildFormatter method can be called + // a few times with the same parameters in parallel if the cache does not contain values + // associated to those parameters. Since the formatter is immutable, it does not matter. + // In this way, synchronised is intentionally omitted in this method to make parallel calls + // less synchronised. + // The Cache.get method is not used here to avoid creation of additional instances of Callable. + protected def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = { + val key = (pattern, locale) + var formatter = cache.getIfPresent(key) + if (formatter == null) { + formatter = buildFormatter(pattern, locale) + cache.put(key, formatter) + } + formatter + } +} + +private object DateTimeFormatterHelper { + val cache = CacheBuilder.newBuilder() + .maximumSize(128) + .build[(String, Locale), DateTimeFormatter]() + + def buildFormatter(pattern: String, locale: Locale): DateTimeFormatter = { + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .appendPattern(pattern) + .parseDefaulting(ChronoField.ERA, 1) + .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) + .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(locale) + .withChronology(IsoChronology.INSTANCE) + .withResolverStyle(ResolverStyle.STRICT) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 81d7274..f01a769 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} +import java.time.Instant import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.ConcurrentHashMap import java.util.function.{Function => JFunction} @@ -50,7 +51,7 @@ object DateTimeUtils { final val MILLIS_PER_SECOND = 1000L final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY - + final val NANOS_PER_MICROS = 1000L final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L // number of days in 400 years @@ -440,6 +441,18 @@ object DateTimeUtils { Some(c.getTimeInMillis * 1000 + segments(6)) } + def instantToMicros(instant: Instant): Long = { + val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) + val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS) + result + } + + def instantToDays(instant: Instant): Int = { + val seconds = instant.getEpochSecond + val days = Math.floorDiv(seconds, SECONDS_PER_DAY) + days.toInt + } + /** * Parses a given UTF8 date string to a corresponding [[Int]] value. * The return type is [[Option]] in order to distinguish between 0 and null. The following diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala new file mode 100644 index 0000000..4ec61e1 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.text.ParseException +import java.time._ +import java.time.format.DateTimeParseException +import java.time.temporal.TemporalQueries +import java.util.{Locale, TimeZone} + +import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToMicros + +sealed trait TimestampFormatter extends Serializable { + /** + * Parses a timestamp in a string and converts it to microseconds. + * + * @param s - string with timestamp to parse + * @return microseconds since epoch. + * @throws ParseException can be thrown by legacy parser + * @throws DateTimeParseException can be thrown by new parser + * @throws DateTimeException unable to obtain local date or time + */ + @throws(classOf[ParseException]) + @throws(classOf[DateTimeParseException]) + @throws(classOf[DateTimeException]) + def parse(s: String): Long + def format(us: Long): String +} + +class Iso8601TimestampFormatter( + pattern: String, + timeZone: TimeZone, + locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper { + @transient + private lazy val formatter = getOrCreateFormatter(pattern, locale) + + private def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + if (temporalAccessor.query(TemporalQueries.offset()) == null) { + toInstantWithZoneId(temporalAccessor, timeZone.toZoneId) + } else { + Instant.from(temporalAccessor) + } + } + + override def parse(s: String): Long = instantToMicros(toInstant(s)) + + override def format(us: Long): String = { + val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND) + val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND) + val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS) + + formatter.withZone(timeZone.toZoneId).format(instant) + } +} + +object TimestampFormatter { + val defaultPattern: String = "yyyy-MM-dd HH:mm:ss" + val defaultLocale: Locale = Locale.US + + def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { + new Iso8601TimestampFormatter(format, timeZone, locale) + } + + def apply(format: String, timeZone: TimeZone): TimestampFormatter = { + apply(format, timeZone, defaultLocale) + } + + def apply(timeZone: TimeZone): TimestampFormatter = { + apply(defaultPattern, timeZone, defaultLocale) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala new file mode 100644 index 0000000..4d869d7 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.plans + +import java.io.File + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +trait SQLHelper { + + /** + * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL + * configurations. + */ + protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + val conf = SQLConf.get + val (keys, values) = pairs.unzip + val currentValues = keys.map { key => + if (conf.contains(key)) { + Some(conf.getConfString(key)) + } else { + None + } + } + (keys, values).zipped.foreach { (k, v) => + if (SQLConf.staticConfKeys.contains(k)) { + throw new AnalysisException(s"Cannot modify the value of a static config: $k") + } + conf.setConfString(k, v) + } + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => conf.setConfString(key, value) + case (key, None) => conf.unsetConf(key) + } + } + } + + /** + * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If + * a file/directory is created there by `f`, it will be delete after `f` returns. + */ + protected def withTempPath(f: File => Unit): Unit = { + val path = Utils.createTempDir() + path.delete() + try f(path) finally Utils.deleteRecursively(path) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala index 0c1feb3..66d8d28 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala @@ -26,6 +26,17 @@ object DateTimeTestUtils { val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone) + val outstandingTimezonesIds: Seq[String] = Seq( + "UTC", + "PST", + "CET", + "Africa/Dakar", + "America/Los_Angeles", + "Antarctica/Vostok", + "Asia/Hong_Kong", + "Europe/Amsterdam") + val outstandingTimezones: Seq[TimeZone] = outstandingTimezonesIds.map(TimeZone.getTimeZone) + def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = { val originalDefaultTimeZone = TimeZone.getDefault try { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala new file mode 100644 index 0000000..602542f --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.time.LocalDate + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf + +class DateFormatterSuite extends SparkFunSuite with SQLHelper { + test("parsing dates") { + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter() + val daysSinceEpoch = formatter.parse("2018-12-02") + assert(daysSinceEpoch === 17867) + } + } + } + + test("format dates") { + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter() + val date = formatter.format(17867) + assert(date === "2018-12-02") + } + } + } + + test("roundtrip date -> days -> date") { + Seq( + "0050-01-01", + "0953-02-02", + "1423-03-08", + "1969-12-31", + "1972-08-25", + "1975-09-26", + "2018-12-12", + "2038-01-01", + "5010-11-17").foreach { date => + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter() + val days = formatter.parse(date) + val formatted = formatter.format(days) + assert(date === formatted) + } + } + } + } + + test("roundtrip days -> date -> days") { + Seq( + -701265, + -371419, + -199722, + -1, + 0, + 967, + 2094, + 17877, + 24837, + 1110657).foreach { days => + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter() + val date = formatter.format(days) + val parsed = formatter.parse(date) + assert(days === parsed) + } + } + } + } + + test("parsing date without explicit day") { + val formatter = DateFormatter("yyyy MMM") + val daysSinceEpoch = formatter.parse("2018 Dec") + assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala new file mode 100644 index 0000000..192ca13 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.time.{LocalDateTime, ZoneOffset} +import java.util.TimeZone +import java.util.concurrent.TimeUnit + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, TimestampFormatter} + +class TimestampFormatterSuite extends SparkFunSuite with SQLHelper { + + test("parsing timestamps using time zones") { + val localDate = "2018-12-02T10:11:12.001234" + val expectedMicros = Map( + "UTC" -> 1543745472001234L, + "PST" -> 1543774272001234L, + "CET" -> 1543741872001234L, + "Africa/Dakar" -> 1543745472001234L, + "America/Los_Angeles" -> 1543774272001234L, + "Antarctica/Vostok" -> 1543723872001234L, + "Asia/Hong_Kong" -> 1543716672001234L, + "Europe/Amsterdam" -> 1543741872001234L) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = TimestampFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone)) + val microsSinceEpoch = formatter.parse(localDate) + assert(microsSinceEpoch === expectedMicros(timeZone)) + } + } + + test("format timestamps using time zones") { + val microsSinceEpoch = 1543745472001234L + val expectedTimestamp = Map( + "UTC" -> "2018-12-02T10:11:12.001234", + "PST" -> "2018-12-02T02:11:12.001234", + "CET" -> "2018-12-02T11:11:12.001234", + "Africa/Dakar" -> "2018-12-02T10:11:12.001234", + "America/Los_Angeles" -> "2018-12-02T02:11:12.001234", + "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234", + "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234", + "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234") + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = TimestampFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone)) + val timestamp = formatter.format(microsSinceEpoch) + assert(timestamp === expectedTimestamp(timeZone)) + } + } + + test("roundtrip micros -> timestamp -> micros using timezones") { + Seq("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXXXX").foreach { pattern => + Seq( + -58710115316212000L, + -18926315945345679L, + -9463427405253013L, + -244000001L, + 0L, + 99628200102030L, + 1543749753123456L, + 2177456523456789L, + 11858049903010203L).foreach { micros => + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val formatter = TimestampFormatter(pattern, timeZone) + val timestamp = formatter.format(micros) + val parsed = formatter.parse(timestamp) + assert(micros === parsed) + } + } + } + } + + test("roundtrip timestamp -> micros -> timestamp using timezones") { + Seq( + "0109-07-20T18:38:03.788000", + "1370-04-01T10:00:54.654321", + "1670-02-11T14:09:54.746987", + "1969-12-31T23:55:55.999999", + "1970-01-01T00:00:00.000000", + "1973-02-27T02:30:00.102030", + "2018-12-02T11:22:33.123456", + "2039-01-01T01:02:03.456789", + "2345-10-07T22:45:03.010203").foreach { timestamp => + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone) + val micros = formatter.parse(timestamp) + val formatted = formatter.format(micros) + assert(timestamp === formatted) + } + } + } + + test(" case insensitive parsing of am and pm") { + val formatter = TimestampFormatter( + "yyyy MMM dd hh:mm:ss a", + TimeZone.getTimeZone("UTC")) + val micros = formatter.parse("2009 Mar 20 11:30:01 am") + assert(micros === TimeUnit.SECONDS.toMicros( + LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC))) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org