Repository: spark
Updated Branches:
  refs/heads/branch-2.2 e84e9dd54 -> 10e599f69


[SPARK-20588][SQL] Cache TimeZone instances.

## What changes were proposed in this pull request?

Because the method `TimeZone.getTimeZone(String ID)` is synchronized on the 
TimeZone class, concurrent call of this method will become a bottleneck.
This especially happens when casting from string value containing timezone info 
to timestamp value, which uses `DateTimeUtils.stringToTimestamp()` and gets 
TimeZone instance on the site.

This pr makes a cache of the generated TimeZone instances to avoid the 
synchronization.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ues...@databricks.com>

Closes #17933 from ueshin/issues/SPARK-20588.

(cherry picked from commit c8c878a4166415728f6e940504766a099a2f6744)
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10e599f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10e599f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10e599f6

Branch: refs/heads/branch-2.2
Commit: 10e599f69c5c1b1b17d9181b2e93b7e315759b9d
Parents: e84e9dd
Author: Takuya UESHIN <ues...@databricks.com>
Authored: Mon May 15 16:52:22 2017 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Mon May 15 16:52:31 2017 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/datetimeExpressions.scala | 17 ++++++++++-------
 .../spark/sql/catalyst/json/JSONOptions.scala      |  2 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala    |  4 +---
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 17 ++++++++++++++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  4 ++--
 .../spark/sql/execution/QueryExecution.scala       |  3 +--
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../sql/execution/datasources/csv/CSVOptions.scala |  2 +-
 .../sql/execution/streaming/ProgressReporter.scala |  5 +++--
 9 files changed, 34 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index bb8fd50..6a76058 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression {
   /** Returns a copy of this expression with the specified timeZoneId. */
   def withTimeZone(timeZoneId: String): TimeZoneAwareExpression
 
-  @transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get)
+  @transient lazy val timeZone: TimeZone = 
DateTimeUtils.getTimeZone(timeZoneId.get)
 }
 
 /**
@@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends 
UnaryExpression with ImplicitCa
   override def dataType: DataType = IntegerType
 
   @transient private lazy val c = {
-    val c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+    val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
     c.setFirstDayOfWeek(Calendar.MONDAY)
     c.setMinimalDaysInFirstWeek(4)
     c
@@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends 
UnaryExpression with ImplicitCa
     nullSafeCodeGen(ctx, ev, time => {
       val cal = classOf[Calendar].getName
       val c = ctx.freshName("cal")
+      val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
       ctx.addMutableState(cal, c,
         s"""
-          $c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC"));
+          $c = $cal.getInstance($dtu.getTimeZone("UTC"));
           $c.setFirstDayOfWeek($cal.MONDAY);
           $c.setMinimalDaysInFirstWeek(4);
          """)
@@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: 
Expression)
         val tzTerm = ctx.freshName("tz")
         val utcTerm = ctx.freshName("utc")
         val tzClass = classOf[TimeZone].getName
-        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = 
$tzClass.getTimeZone("$tz");""")
-        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = 
$tzClass.getTimeZone("UTC");""")
+        val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = 
$dtu.getTimeZone("$tz");""")
+        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = 
$dtu.getTimeZone("UTC");""")
         val eval = left.genCode(ctx)
         ev.copy(code = s"""
            |${eval.code}
@@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: 
Expression)
         val tzTerm = ctx.freshName("tz")
         val utcTerm = ctx.freshName("utc")
         val tzClass = classOf[TimeZone].getName
-        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = 
$tzClass.getTimeZone("$tz");""")
-        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = 
$tzClass.getTimeZone("UTC");""")
+        val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = 
$dtu.getTimeZone("$tz");""")
+        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = 
$dtu.getTimeZone("UTC");""")
         val eval = left.genCode(ctx)
         ev.copy(code = s"""
            |${eval.code}

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 23ba5ed..7930515 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -70,7 +70,7 @@ private[sql] class JSONOptions(
   val columnNameOfCorruptRecord =
     parameters.getOrElse("columnNameOfCorruptRecord", 
defaultColumnNameOfCorruptRecord)
 
-  val timeZone: TimeZone = TimeZone.getTimeZone(
+  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 89e1dc9..af0837e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.util.TimeZone
-
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
       case CurrentDate(Some(timeZoneId)) =>
         currentDates.getOrElseUpdate(timeZoneId, {
           Literal.create(
-            DateTimeUtils.millisToDays(timestamp / 1000L, 
TimeZone.getTimeZone(timeZoneId)),
+            DateTimeUtils.millisToDays(timestamp / 1000L, 
DateTimeUtils.getTimeZone(timeZoneId)),
             DateType)
         })
       case CurrentTimestamp() => currentTime

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index eb6aad5..fc4946d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
 import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
 import java.util.{Calendar, Locale, TimeZone}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.{Function => JFunction}
 import javax.xml.bind.DatatypeConverter
 
 import scala.annotation.tailrec
@@ -98,6 +100,15 @@ object DateTimeUtils {
     sdf
   }
 
+  private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
+  private val computeTimeZone = new JFunction[String, TimeZone] {
+    override def apply(timeZoneId: String): TimeZone = 
TimeZone.getTimeZone(timeZoneId)
+  }
+
+  def getTimeZone(timeZoneId: String): TimeZone = {
+    computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
+  }
+
   def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
     val sdf = new SimpleDateFormat(formatString, Locale.US)
     sdf.setTimeZone(timeZone)
@@ -407,7 +418,7 @@ object DateTimeUtils {
       Calendar.getInstance(timeZone)
     } else {
       Calendar.getInstance(
-        
TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
+        
getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
     }
     c.set(Calendar.MILLISECOND, 0)
 
@@ -1027,7 +1038,7 @@ object DateTimeUtils {
    * representation in their timezone.
    */
   def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone))
+    convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
   }
 
   /**
@@ -1035,7 +1046,7 @@ object DateTimeUtils {
    * string representation in their timezone.
    */
   def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT)
+    convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 433f6d1..cc04ac4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
 
 import java.io.CharArrayWriter
 import java.sql.{Date, Timestamp}
-import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -247,7 +246,8 @@ class Dataset[T] private[sql](
     val hasMoreData = takeResult.length > numRows
     val data = takeResult.take(numRows)
 
-    lazy val timeZone = 
TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
+    lazy val timeZone =
+      
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
 
     // For array values, replace Seq and Array with square brackets
     // For cells that are beyond `truncate` characters, replace it with the

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 8e8210e..2e05e5d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.util.TimeZone
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
         DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
       case (t: Timestamp, TimestampType) =>
         DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
-          
TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
+          
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
       case (bin: Array[Byte], BinaryType) => new String(bin, 
StandardCharsets.UTF_8)
       case (decimal: java.math.BigDecimal, DecimalType()) => 
formatDecimal(decimal)
       case (other, tpe) if primitiveTypes.contains(tpe) => other.toString

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 2d70172..f61c673 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -94,7 +94,7 @@ object PartitioningUtils {
       typeInference: Boolean,
       basePaths: Set[Path],
       timeZoneId: String): PartitionSpec = {
-    parsePartitions(paths, typeInference, basePaths, 
TimeZone.getTimeZone(timeZoneId))
+    parsePartitions(paths, typeInference, basePaths, 
DateTimeUtils.getTimeZone(timeZoneId))
   }
 
   private[datasources] def parsePartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 62e4c6e..78c16b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -117,7 +117,7 @@ class CSVOptions(
     name.map(CompressionCodecs.getCodecClassName)
   }
 
-  val timeZone: TimeZone = TimeZone.getTimeZone(
+  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.

http://git-wip-us.apache.org/repos/asf/spark/blob/10e599f6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 693933f..a4e4ca8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.text.SimpleDateFormat
-import java.util.{Date, TimeZone, UUID}
+import java.util.{Date, UUID}
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -82,7 +83,7 @@ trait ProgressReporter extends Logging {
   private var lastNoDataProgressEventTime = Long.MinValue
 
   private val timestampFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
-  timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
 
   @volatile
   protected var currentStatus: StreamingQueryStatus = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to