[
https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896061#comment-15896061
]
gagan taneja commented on SPARK-19145:
--------------------------------------
I am suggesting following changes
introduce the function to test perfactCast similar to
private def perfectCast(expr: Literal, dataType: DataType): Boolean = {
expr match {
case Literal(value, StringType) => scala.util.Try {
Cast(expr, dataType).eval(null)
}.isSuccess
case _ => false
}
}
And string promotion based on condition if input string can be perfectly casted
// We should cast all relative timestamp/date/string comparison into string
comparisons
// This behaves as a user would expect because timestamp strings sort
lexicographically.
// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
// For cases where its a exact cast we should cast String type to
timestamp time
// This would speed up the execution
// i.e TimeStamp(2013-01-01 00:00T ...) < '2017-01-02 19:53:51' would
translate to
// TimeStamp(2013-01-01 00:00T ...) < Timestamp(2017-01-02 19:53:51)
would translate to
case p @ BinaryComparison(left @ Literal(_, StringType), right @
DateType())
if (acceptedDataTypes.contains( right) &&
perfectCast( left, right.dataType ) ) =>
p.makeCopy( Array( Cast( left, right.dataType), right ))
case p @ BinaryComparison(left @ StringType(), right @ DateType())
if acceptedDataTypes.contains( right) =>
p.makeCopy(Array(left, Cast(right, StringType)))
case p @ BinaryComparison(left @ DateType(), right @ Literal(_,
StringType))
if (acceptedDataTypes.contains( left) &&
perfectCast( right, left.dataType )) =>
p.makeCopy( Array( left, Cast( right, left.dataType) ))
case p @ BinaryComparison(left @ DateType(), right @ StringType())
if acceptedDataTypes.contains( left) =>
p.makeCopy(Array(Cast(left, StringType), right))
> Timestamp to String casting is slowing the query significantly
> --------------------------------------------------------------
>
> Key: SPARK-19145
> URL: https://issues.apache.org/jira/browse/SPARK-19145
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: gagan taneja
>
> i have a time series table with timestamp column
> Following query
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 50000
> is significantly SLOWER than
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','YYYY-MM-DD
> HH24:MI:SS−0800')
> AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','YYYY-MM-DD
> HH24:MI:SS−0800') LIMIT 50000
> After investigation i found that in the first query time colum is cast to
> String before applying the filter
> However in the second query no such casting is performed and its a filter
> with long value
> Below are the generate Physical plan for slower execution followed by
> physical plan for faster execution
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 50000
> == Physical Plan ==
> CollectLimit 50000
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
> +- Exchange SinglePartition
> +- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#3339L])
> +- *Project
> +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string)
> >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09
> 19:53:51))
> +- *FileScan parquet default.cstat[time#3314] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat],
> PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema:
> struct<time:timestamp>
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','YYYY-MM-DD
> HH24:MI:SS−0800')
> AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','YYYY-MM-DD
> HH24:MI:SS−0800') LIMIT 50000
> == Physical Plan ==
> CollectLimit 50000
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
> +- Exchange SinglePartition
> +- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#3287L])
> +- *Project
> +- *Filter ((isnotnull(time#3262) && (time#3262 >=
> 1483404831000000)) && (time#3262 <= 1484009631000000))
> +- *FileScan parquet default.cstat[time#3262] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat],
> PartitionFilters: [], PushedFilters: [IsNotNull(time),
> GreaterThanOrEqual(time,2017-01-02 19:53:51.0),
> LessThanOrEqual(time,2017-01-09..., ReadSchema: struct<time:timestamp>
> In Impala both query run efficiently without and performance difference
> Spark should be able to parse the Date string and convert to Long/Timestamp
> during generation of Optimized Logical Plan so that both the query would have
> similar performance
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]