This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new cbe75bb [SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator cbe75bb is described below commit cbe75bb8879ec408088eaf6944284a893bb63c92 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue May 12 14:05:31 2020 +0000 [SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator ### What changes were proposed in this pull request? Generates java.time.Instant/java.time.LocalDate for DateType/TimestampType by `RandomDataGenerator.forType` when the SQL config `spark.sql.datetime.java8API.enabled` is set to `true`. ### Why are the changes needed? To improve test coverage, and check java.time.Instant/java.time.LocalDate types in round trip tests. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running modified test suites `RowEncoderSuite`, `RandomDataGeneratorSuite` and `HadoopFsRelationTest`. Closes #28502 from MaxGekk/random-java8-datetime. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit a3fafddf390fd180047a0b9ef46f052a9b6813e0) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/RandomDataGenerator.scala | 105 +++++++++++++-------- .../spark/sql/RandomDataGeneratorSuite.scala | 32 ++++--- .../sql/catalyst/encoders/RowEncoderSuite.scala | 36 +++---- .../spark/sql/sources/HadoopFsRelationTest.scala | 75 ++++++++------- 4 files changed, 146 insertions(+), 102 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index cf8d772..6a5bdc4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql import java.math.MathContext import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import scala.collection.mutable import scala.util.{Random, Try} import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY +import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_MILLIS, MILLIS_PER_DAY} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval /** @@ -162,7 +164,7 @@ object RandomDataGenerator { }) case BooleanType => Some(() => rand.nextBoolean()) case DateType => - def uniformDateRand(rand: Random): java.sql.Date = { + def uniformDaysRand(rand: Random): Int = { var milliseconds = rand.nextLong() % 253402329599999L // -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT // for "0001-01-01 00:00:00.000000". We need to find a @@ -172,27 +174,37 @@ object RandomDataGenerator { // January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999". milliseconds = rand.nextLong() % 253402329599999L } - val date = DateTimeUtils.toJavaDate((milliseconds / MILLIS_PER_DAY).toInt) - // The generated `date` is based on the hybrid calendar Julian + Gregorian since - // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used - // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `date` to - // a local date in Proleptic Gregorian calendar to satisfy this requirement. - // Some years are leap years in Julian calendar but not in Proleptic Gregorian calendar. - // As the consequence of that, 29 February of such years might not exist in Proleptic - // Gregorian calendar. When this happens, we shift the date by one day. - Try { date.toLocalDate; date }.getOrElse(new Date(date.getTime + MILLIS_PER_DAY)) + (milliseconds / MILLIS_PER_DAY).toInt + } + val specialDates = Seq( + "0001-01-01", // the fist day of Common Era + "1582-10-15", // the cutover date from Julian to Gregorian calendar + "1970-01-01", // the epoch date + "9999-12-31" // the last supported date according to SQL standard + ) + if (SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED)) { + randomNumeric[LocalDate]( + rand, + (rand: Random) => LocalDate.ofEpochDay(uniformDaysRand(rand)), + specialDates.map(LocalDate.parse)) + } else { + randomNumeric[java.sql.Date]( + rand, + (rand: Random) => { + val date = DateTimeUtils.toJavaDate(uniformDaysRand(rand)) + // The generated `date` is based on the hybrid calendar Julian + Gregorian since + // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used + // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `date` to + // a local date in Proleptic Gregorian calendar to satisfy this requirement. Some + // years are leap years in Julian calendar but not in Proleptic Gregorian calendar. + // As the consequence of that, 29 February of such years might not exist in Proleptic + // Gregorian calendar. When this happens, we shift the date by one day. + Try { date.toLocalDate; date }.getOrElse(new Date(date.getTime + MILLIS_PER_DAY)) + }, + specialDates.map(java.sql.Date.valueOf)) } - randomNumeric[java.sql.Date]( - rand, - uniformDateRand, - Seq( - "0001-01-01", // the fist day of Common Era - "1582-10-15", // the cutover date from Julian to Gregorian calendar - "1970-01-01", // the epoch date - "9999-12-31" // the last supported date according to SQL standard - ).map(java.sql.Date.valueOf)) case TimestampType => - def uniformTimestampRand(rand: Random): java.sql.Timestamp = { + def uniformMicorsRand(rand: Random): Long = { var milliseconds = rand.nextLong() % 253402329599999L // -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT // for "0001-01-01 00:00:00.000000". We need to find a @@ -202,26 +214,39 @@ object RandomDataGenerator { // January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999". milliseconds = rand.nextLong() % 253402329599999L } - // DateTimeUtils.toJavaTimestamp takes microsecond. - val ts = DateTimeUtils.toJavaTimestamp(milliseconds * 1000) - // The generated `ts` is based on the hybrid calendar Julian + Gregorian since - // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used - // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `ts` to - // a local timestamp in Proleptic Gregorian calendar to satisfy this requirement. - // Some years are leap years in Julian calendar but not in Proleptic Gregorian calendar. - // As the consequence of that, 29 February of such years might not exist in Proleptic - // Gregorian calendar. When this happens, we shift the timestamp `ts` by one day. - Try { ts.toLocalDateTime; ts }.getOrElse(new Timestamp(ts.getTime + MILLIS_PER_DAY)) + milliseconds * MICROS_PER_MILLIS + } + val specialTs = Seq( + "0001-01-01 00:00:00", // the fist timestamp of Common Era + "1582-10-15 23:59:59", // the cutover date from Julian to Gregorian calendar + "1970-01-01 00:00:00", // the epoch timestamp + "9999-12-31 23:59:59" // the last supported timestamp according to SQL standard + ) + if (SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED)) { + randomNumeric[Instant]( + rand, + (rand: Random) => DateTimeUtils.microsToInstant(uniformMicorsRand(rand)), + specialTs.map { s => + val ldt = LocalDateTime.parse(s.replace(" ", "T")) + ldt.atZone(ZoneId.systemDefault()).toInstant + }) + } else { + randomNumeric[java.sql.Timestamp]( + rand, + (rand: Random) => { + // DateTimeUtils.toJavaTimestamp takes microsecond. + val ts = DateTimeUtils.toJavaTimestamp(uniformMicorsRand(rand)) + // The generated `ts` is based on the hybrid calendar Julian + Gregorian since + // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used + // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `ts` to + // a local timestamp in Proleptic Gregorian calendar to satisfy this requirement. Some + // years are leap years in Julian calendar but not in Proleptic Gregorian calendar. + // As the consequence of that, 29 February of such years might not exist in Proleptic + // Gregorian calendar. When this happens, we shift the timestamp `ts` by one day. + Try { ts.toLocalDateTime; ts }.getOrElse(new Timestamp(ts.getTime + MILLIS_PER_DAY)) + }, + specialTs.map(java.sql.Timestamp.valueOf)) } - randomNumeric[java.sql.Timestamp]( - rand, - uniformTimestampRand, - Seq( - "0001-01-01 00:00:00", // the fist timestamp of Common Era - "1582-10-15 23:59:59", // the cutover date from Julian to Gregorian calendar - "1970-01-01 00:00:00", // the epoch timestamp - "9999-12-31 23:59:59.999" // the last supported timestamp according to SQL standard - ).map(java.sql.Timestamp.valueOf)) case CalendarIntervalType => Some(() => { val months = rand.nextInt(1000) val days = rand.nextInt(10000) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala index 3e62ca0..cb335e5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala @@ -24,30 +24,36 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** * Tests of [[RandomDataGenerator]]. */ -class RandomDataGeneratorSuite extends SparkFunSuite { +class RandomDataGeneratorSuite extends SparkFunSuite with SQLHelper { /** * Tests random data generation for the given type by using it to generate random values then * converting those values into their Catalyst equivalents using CatalystTypeConverters. */ def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = { - val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType) - val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse { - fail(s"Random data generator was not defined for $dataType") - } - if (nullable) { - assert(Iterator.fill(100)(generator()).contains(null)) - } else { - assert(!Iterator.fill(100)(generator()).contains(null)) - } - for (_ <- 1 to 10) { - val generatedValue = generator() - toCatalyst(generatedValue) + Seq(false, true).foreach { java8Api => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType) + val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse { + fail(s"Random data generator was not defined for $dataType") + } + if (nullable) { + assert(Iterator.fill(100)(generator()).contains(null)) + } else { + assert(!Iterator.fill(100)(generator()).contains(null)) + } + for (_ <- 1 to 10) { + val generatedValue = generator() + toCatalyst(generatedValue) + } + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index c1158e0..fd24f05 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -377,23 +377,27 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { private def encodeDecodeTest(schema: StructType): Unit = { test(s"encode/decode: ${schema.simpleString}") { - val encoder = RowEncoder(schema).resolveAndBind() - val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get - - var input: Row = null - try { - for (_ <- 1 to 5) { - input = inputGenerator.apply().asInstanceOf[Row] - val convertedBack = roundTrip(encoder, input) - assert(input == convertedBack) + Seq(false, true).foreach { java8Api => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + val encoder = RowEncoder(schema).resolveAndBind() + val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get + + var input: Row = null + try { + for (_ <- 1 to 5) { + input = inputGenerator.apply().asInstanceOf[Row] + val convertedBack = roundTrip(encoder, input) + assert(input == convertedBack) + } + } catch { + case e: Exception => + fail( + s""" + |schema: ${schema.simpleString} + |input: ${input} + """.stripMargin, e) + } } - } catch { - case e: Exception => - fail( - s""" - |schema: ${schema.simpleString} - |input: ${input} - """.stripMargin, e) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 9cf1719..42b6862 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -145,40 +145,49 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val seed = System.nanoTime() withClue(s"Random data generated with the seed: ${seed}") { - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(seed) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") + val java8ApiConfValues = if (dataType == DateType || dataType == TimestampType) { + Seq(false, true) + } else { + Seq(false) + } + java8ApiConfValues.foreach { java8Api => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(seed) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator()))) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .save(path) + + val loadedDF = spark + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .options(extraOptions) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) + } } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .save(path) - - val loadedDF = spark - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .options(extraOptions) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org