This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c8c8c9a22edc [SPARK-51661][SQL] Partitions discovery of TIME column
values
c8c8c9a22edc is described below
commit c8c8c9a22edc3ae81ffdb8ebf35e8a677a0a4cca
Author: Max Gekk <[email protected]>
AuthorDate: Mon Mar 31 08:19:53 2025 +0300
[SPARK-51661][SQL] Partitions discovery of TIME column values
### What changes were proposed in this pull request?
In the PR, I propose to infer the `TIME` data type from partition values
that match to the pattern `HH:mm:ss[.SSSSSS]`. The second fraction part has
variable length namely the following values match to the pattern:
`01:02:03.001`, `23:59:59` and `12:13:14.123456`.
### Why are the changes needed?
Currently, Spark can save a dataset partitioned by a TIME column, and read
it back if an user set a schema explicitly, but it cannot infer the TIME data
type of the column automatically. For example:
```scala
scala> sql("SELECT time'12:00' AS t, 0 as
id").write.partitionBy("t").parquet("/Users/maxim.gekk/tmp/time_parquet2")
scala>
spark.read.parquet("/Users/maxim.gekk/tmp/time_parquet2").printSchema()
root
|-- id: integer (nullable = true)
|-- t: string (nullable = true)
```
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the inferred type is `TIME(6)` instead of `STRING`
for the example above:
```scala
scala>
spark.read.parquet("/Users/maxim.gekk/tmp/time_parquet2").printSchema()
root
|-- id: integer (nullable = true)
|-- t: time(6) (nullable = true)
```
### How was this patch tested?
By running new test:
```
$ build/sbt "test:testOnly *ParquetV1PartitionDiscoverySuite"
$ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #50453 from MaxGekk/time-partitions-discovery.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../sql/catalyst/analysis/AnsiTypeCoercion.scala | 2 +-
.../spark/sql/catalyst/analysis/TypeCoercion.scala | 2 +-
.../sql/catalyst/analysis/TypeCoercionHelper.scala | 13 +++--
.../execution/datasources/PartitioningUtils.scala | 62 +++++++++++++-------
.../parquet/ParquetPartitionDiscoverySuite.scala | 68 +++++++++++++++++++---
5 files changed, 113 insertions(+), 34 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
index aa977b240007..a98aaf702ace 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
@@ -122,7 +122,7 @@ object AnsiTypeCoercion extends TypeCoercionBase {
Some(widerType)
}
- case (d1: DatetimeType, d2: DatetimeType) =>
Some(findWiderDateTimeType(d1, d2))
+ case (d1: DatetimeType, d2: DatetimeType) => findWiderDateTimeType(d1, d2)
case (t1: DayTimeIntervalType, t2: DayTimeIntervalType) =>
Some(DayTimeIntervalType(t1.startField.min(t2.startField),
t1.endField.max(t2.endField)))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 4769970b5142..f68084803fe7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -90,7 +90,7 @@ object TypeCoercion extends TypeCoercionBase {
val index = numericPrecedence.lastIndexWhere(t => t == t1 || t == t2)
Some(numericPrecedence(index))
- case (d1: DatetimeType, d2: DatetimeType) =>
Some(findWiderDateTimeType(d1, d2))
+ case (d1: DatetimeType, d2: DatetimeType) => findWiderDateTimeType(d1,
d2)
case (t1: DayTimeIntervalType, t2: DayTimeIntervalType) =>
Some(DayTimeIntervalType(t1.startField.min(t2.startField),
t1.endField.max(t2.endField)))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
index b11ec6d8fe67..390ff2f3114d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
@@ -84,7 +84,8 @@ import org.apache.spark.sql.types.{
StructType,
TimestampNTZType,
TimestampType,
- TimestampTypeExpression
+ TimestampTypeExpression,
+ TimeType
}
abstract class TypeCoercionHelper {
@@ -239,16 +240,18 @@ abstract class TypeCoercionHelper {
}
}
- protected def findWiderDateTimeType(d1: DatetimeType, d2: DatetimeType):
DatetimeType =
+ protected def findWiderDateTimeType(d1: DatetimeType, d2: DatetimeType):
Option[DatetimeType] =
(d1, d2) match {
+ case (_, _: TimeType) => None
+ case (_: TimeType, _) => None
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
- TimestampType
+ Some(TimestampType)
case (_: TimestampType, _: TimestampNTZType) | (_: TimestampNTZType, _:
TimestampType) =>
- TimestampType
+ Some(TimestampType)
case (_: TimestampNTZType, _: DateType) | (_: DateType, _:
TimestampNTZType) =>
- TimestampNTZType
+ Some(TimestampNTZType)
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 6f39636b5f5e..1bc4645dfc43 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter,
DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter,
DateTimeUtils, TimeFormatter, TimestampFormatter}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
@@ -66,6 +66,7 @@ object PartitionSpec {
object PartitioningUtils extends SQLConfHelper {
+ val timePartitionPattern = "HH:mm:ss[.SSSSSS]"
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
case class TypedPartValue(value: String, dataType: DataType)
@@ -145,10 +146,11 @@ object PartitioningUtils extends SQLConfHelper {
timestampPartitionPattern,
zoneId,
isParsing = true)
+ val timeFormatter = TimeFormatter(timePartitionPattern, isParsing = true)
// First, we need to parse every partition's path and see if we can find
partition values.
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
- validatePartitionColumns, zoneId, dateFormatter, timestampFormatter)
+ validatePartitionColumns, zoneId, dateFormatter, timestampFormatter,
timeFormatter)
}.unzip
// We create pairs of (path -> path's partition value) here
@@ -240,7 +242,8 @@ object PartitioningUtils extends SQLConfHelper {
validatePartitionColumns: Boolean,
zoneId: ZoneId,
dateFormatter: DateFormatter,
- timestampFormatter: TimestampFormatter): (Option[PartitionValues],
Option[Path]) = {
+ timestampFormatter: TimestampFormatter,
+ timeFormatter: TimeFormatter): (Option[PartitionValues], Option[Path]) =
{
val columns = ArrayBuffer.empty[(String, TypedPartValue)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
@@ -262,7 +265,7 @@ object PartitioningUtils extends SQLConfHelper {
// Once we get the string, we try to parse it and find the partition
column and value.
val maybeColumn =
parsePartitionColumn(currentPath.getName, typeInference,
userSpecifiedDataTypes,
- zoneId, dateFormatter, timestampFormatter)
+ zoneId, dateFormatter, timestampFormatter, timeFormatter)
maybeColumn.foreach(columns += _)
// Now, we determine if we should stop.
@@ -298,7 +301,8 @@ object PartitioningUtils extends SQLConfHelper {
userSpecifiedDataTypes: Map[String, DataType],
zoneId: ZoneId,
dateFormatter: DateFormatter,
- timestampFormatter: TimestampFormatter): Option[(String,
TypedPartValue)] = {
+ timestampFormatter: TimestampFormatter,
+ timeFormatter: TimeFormatter): Option[(String, TypedPartValue)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
@@ -319,7 +323,8 @@ object PartitioningUtils extends SQLConfHelper {
typeInference,
zoneId,
dateFormatter,
- timestampFormatter)
+ timestampFormatter,
+ timeFormatter)
}
Some(columnName -> TypedPartValue(rawColumnValue, dataType))
}
@@ -427,22 +432,23 @@ object PartitioningUtils extends SQLConfHelper {
/**
* Converts a string to a [[Literal]] with automatic type inference.
Currently only supports
* [[NullType]], [[IntegerType]], [[LongType]], [[DoubleType]],
[[DecimalType]], [[DateType]]
- * [[TimestampType]], and [[StringType]].
+ * [[TimestampType]], [[TimeType]] and [[StringType]].
*
* When resolving conflicts, it follows the table below:
*
- *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
- * | InputA \ InputB | NullType | IntegerType | LongType
| DecimalType(38,0)* | DoubleType | DateType | TimestampType |
StringType |
- *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
- * | NullType | NullType | IntegerType | LongType
| DecimalType(38,0) | DoubleType | DateType | TimestampType |
StringType |
- * | IntegerType | IntegerType | IntegerType | LongType
| DecimalType(38,0) | DoubleType | StringType | StringType |
StringType |
- * | LongType | LongType | LongType | LongType
| DecimalType(38,0) | StringType | StringType | StringType |
StringType |
- * | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) |
DecimalType(38,0) | DecimalType(38,0) | StringType | StringType |
StringType | StringType |
- * | DoubleType | DoubleType | DoubleType | StringType
| StringType | DoubleType | StringType | StringType |
StringType |
- * | DateType | DateType | StringType | StringType
| StringType | StringType | DateType | TimestampType |
StringType |
- * | TimestampType | TimestampType | StringType | StringType
| StringType | StringType | TimestampType | TimestampType |
StringType |
- * | StringType | StringType | StringType | StringType
| StringType | StringType | StringType | StringType |
StringType |
- *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+ *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+------------+
+ * | InputA \ InputB | NullType | IntegerType | LongType
| DecimalType(38,0)* | DoubleType | DateType | TimestampType |
StringType | TimeType |
+ *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+------------+
+ * | NullType | NullType | IntegerType | LongType
| DecimalType(38,0) | DoubleType | DateType | TimestampType |
StringType | TimeType |
+ * | IntegerType | IntegerType | IntegerType | LongType
| DecimalType(38,0) | DoubleType | StringType | StringType |
StringType | StringType |
+ * | LongType | LongType | LongType | LongType
| DecimalType(38,0) | StringType | StringType | StringType |
StringType | StringType |
+ * | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) |
DecimalType(38,0) | DecimalType(38,0) | StringType | StringType |
StringType | StringType | StringType |
+ * | DoubleType | DoubleType | DoubleType | StringType
| StringType | DoubleType | StringType | StringType |
StringType | StringType |
+ * | DateType | DateType | StringType | StringType
| StringType | StringType | DateType | TimestampType |
StringType | StringType |
+ * | TimeType | TimeType | StringType | StringType
| StringType | StringType | StringType | StringType |
StringType | TimeType |
+ * | TimestampType | TimestampType | StringType | StringType
| StringType | StringType | TimestampType | TimestampType |
StringType | StringType |
+ * | StringType | StringType | StringType | StringType
| StringType | StringType | StringType | StringType |
StringType | StringType |
+ *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+------------+
* Note that, for DecimalType(38,0)*, the table above intentionally does not
cover all other
* combinations of scales and precisions because currently we only infer
decimal type like
* `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
@@ -453,7 +459,8 @@ object PartitioningUtils extends SQLConfHelper {
typeInference: Boolean,
zoneId: ZoneId,
dateFormatter: DateFormatter,
- timestampFormatter: TimestampFormatter): DataType = {
+ timestampFormatter: TimestampFormatter,
+ timeFormatter: TimeFormatter): DataType = {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of
number.
val bigDecimal = new JBigDecimal(raw)
@@ -499,6 +506,20 @@ object PartitioningUtils extends SQLConfHelper {
timestampType
}
+ val timeTry = Try {
+ val unescapedRaw = unescapePathName(raw)
+ // try and parse the time, if no exception occurs this is a candidate to
be resolved as
+ // TimeType
+ timeFormatter.parse(unescapedRaw)
+ // We need to check that we can cast the raw string since we later can
use Cast to get
+ // the partition values with the right DataType (see
+ //
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
+ val timeValue = Cast(Literal(unescapedRaw), TimeType(),
Some(zoneId.getId)).eval()
+ // Disallow TimeType if the cast returned null
+ require(timeValue != null)
+ TimeType()
+ }
+
if (typeInference) {
// First tries integral types
Try({ Integer.parseInt(raw); IntegerType })
@@ -509,6 +530,7 @@ object PartitioningUtils extends SQLConfHelper {
// Then falls back to date/timestamp types
.orElse(timestampTry)
.orElse(dateTry)
+ .orElse(timeTry)
// Then falls back to string
.getOrElse {
if (raw == DEFAULT_PARTITION_NAME) NullType else StringType
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 206fe0b79120..6f5855461fcc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import java.math.BigInteger
import java.sql.Timestamp
-import java.time.{LocalDateTime, ZoneId, ZoneOffset}
+import java.time.{LocalDateTime, LocalTime, ZoneId, ZoneOffset}
import java.util.Locale
import com.google.common.io.Files
@@ -32,7 +32,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimeFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
@@ -60,6 +60,7 @@ abstract class ParquetPartitionDiscoverySuite
val df = DateFormatter()
val tf = TimestampFormatter(
timestampPartitionPattern, timeZoneId, isParsing = true)
+ val tif = TimeFormatter(timePartitionPattern, isParsing = true)
protected override def beforeAll(): Unit = {
super.beforeAll()
@@ -73,7 +74,7 @@ abstract class ParquetPartitionDiscoverySuite
test("column type inference") {
def check(raw: String, dataType: DataType, zoneId: ZoneId = timeZoneId):
Unit = {
- assert(inferPartitionColumnValue(raw, true, zoneId, df, tf) === dataType)
+ assert(inferPartitionColumnValue(raw, true, zoneId, df, tf, tif) ===
dataType)
}
check("10", IntegerType)
@@ -214,13 +215,14 @@ abstract class ParquetPartitionDiscoverySuite
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
val actual = parsePartition(new Path(path), true, Set.empty[Path],
- Map.empty, true, timeZoneId, df, tf)._1
+ Map.empty, true, timeZoneId, df, tf, tif)._1
assert(expected === actual)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String):
Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true,
timeZoneId, df, tf)
+ parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true,
timeZoneId,
+ df, tf, tif)
}.getMessage
assert(message.contains(expected))
@@ -268,7 +270,8 @@ abstract class ParquetPartitionDiscoverySuite
true,
zoneId = timeZoneId,
df,
- tf)._1
+ tf,
+ tif)._1
assert(partitionSpec1.isEmpty)
@@ -281,7 +284,8 @@ abstract class ParquetPartitionDiscoverySuite
true,
zoneId = timeZoneId,
df,
- tf)._1
+ tf,
+ tif)._1
assert(partitionSpec2 ==
Option(PartitionValues(
@@ -1187,6 +1191,56 @@ abstract class ParquetPartitionDiscoverySuite
assert("Partition column name list #[0-1]: col1,
col2".r.findFirstIn(msg).isDefined)
}
}
+
+ test("Infer the TIME data type from partition values") {
+ val df = Seq(
+ (1, LocalTime.parse("00:00:00")),
+ (2, LocalTime.parse("23:00:00.9")),
+ (3, LocalTime.parse("10:11:12.001")),
+ (4, LocalTime.parse("23:59:59.999999"))
+ ).toDF("i", "time")
+
+ withTempPath { path =>
+ df.write.format("parquet").partitionBy("time").save(path.getAbsolutePath)
+ checkAnswer(spark.read.load(path.getAbsolutePath), df)
+ }
+ }
+
+ test("Invalid times should be inferred as STRING in partition inference") {
+ withTempPath { path =>
+ val data = Seq(("1", "10:61", "12:13:14:04", "T00:01:02", "test"))
+ .toDF("id", "time_min", "time_sec", "time_prefix", "data")
+
+ data.write.partitionBy("time_min", "time_sec",
"time_prefix").parquet(path.getAbsolutePath)
+ val input = spark.read.parquet(path.getAbsolutePath).select("id",
+ "time_min", "time_sec", "time_prefix", "data")
+
+ assert(DataTypeUtils.sameType(data.schema, input.schema))
+ checkAnswer(input, data)
+ }
+ }
+
+ test("Resolve type conflicts between strings and time in partition column") {
+ val df = Seq(
+ (1, "00:00:00"),
+ (2, "23:59:00.001"),
+ (3, "blah")).toDF("i", "str")
+
+ withTempPath { path =>
+ df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
+ checkAnswer(spark.read.load(path.getAbsolutePath), df)
+ }
+ }
+
+ test("Resolve type conflicts between times and timestamps in partition
column") {
+ withTempPath { path =>
+ val df = Seq((1, "23:59:59"), (2, "00:01:00"), (3, "2015-01-01
00:01:00")).toDF("i", "ts")
+ df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath)
+ checkAnswer(
+ spark.read.load(path.getAbsolutePath),
+ Row(1, "23:59:59") :: Row(2, "00:01:00") :: Row(3, "2015-01-01
00:01:00") :: Nil)
+ }
+ }
}
class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]