This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9813b1d  [SPARK-26690] Track query execution and time cost for 
checkpoints
9813b1d is described below

commit 9813b1d0747ecc8ef8b2abaa185fb4c381271777
Author: Tom van Bussel <tom.vanbus...@databricks.com>
AuthorDate: Thu Jan 24 16:44:39 2019 +0100

    [SPARK-26690] Track query execution and time cost for checkpoints
    
    ## What changes were proposed in this pull request?
    
    Checkpoints of Dataframes currently do not show up in SQL UI. This PR fixes 
that by setting an execution id for the execution of the checkpoint by wrapping 
the checkpoint code with a `withAction`.
    
    ## How was this patch tested?
    
    A unit test was added to DatasetSuite.
    
    Closes #23636 from tomvanbussel/SPARK-26690.
    
    Authored-by: Tom van Bussel <tom.vanbus...@databricks.com>
    Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 61 +++++++++++-----------
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 17 +++++-
 2 files changed, 46 insertions(+), 32 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 44cada0..32f6234 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -619,40 +619,41 @@ class Dataset[T] private[sql](
    *                           the caching subsystem
    */
   private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): 
Dataset[T] = {
-    val internalRdd = queryExecution.toRdd.map(_.copy())
-    if (reliableCheckpoint) {
-      internalRdd.checkpoint()
-    } else {
-      internalRdd.localCheckpoint()
-    }
-
-    if (eager) {
-      internalRdd.count()
-    }
-
-    val physicalPlan = queryExecution.executedPlan
+    val actionName = if (reliableCheckpoint) "checkpoint" else 
"localCheckpoint"
+    withAction(actionName, queryExecution) { physicalPlan =>
+      val internalRdd = physicalPlan.execute().map(_.copy())
+      if (reliableCheckpoint) {
+        internalRdd.checkpoint()
+      } else {
+        internalRdd.localCheckpoint()
+      }
 
-    // Takes the first leaf partitioning whenever we see a 
`PartitioningCollection`. Otherwise the
-    // size of `PartitioningCollection` may grow exponentially for queries 
involving deep inner
-    // joins.
-    def firstLeafPartitioning(partitioning: Partitioning): Partitioning = {
-      partitioning match {
-        case p: PartitioningCollection => 
firstLeafPartitioning(p.partitionings.head)
-        case p => p
+      if (eager) {
+        internalRdd.count()
       }
-    }
 
-    val outputPartitioning = 
firstLeafPartitioning(physicalPlan.outputPartitioning)
+      // Takes the first leaf partitioning whenever we see a 
`PartitioningCollection`. Otherwise the
+      // size of `PartitioningCollection` may grow exponentially for queries 
involving deep inner
+      // joins.
+      def firstLeafPartitioning(partitioning: Partitioning): Partitioning = {
+        partitioning match {
+          case p: PartitioningCollection => 
firstLeafPartitioning(p.partitionings.head)
+          case p => p
+        }
+      }
 
-    Dataset.ofRows(
-      sparkSession,
-      LogicalRDD(
-        logicalPlan.output,
-        internalRdd,
-        outputPartitioning,
-        physicalPlan.outputOrdering,
-        isStreaming
-      )(sparkSession)).as[T]
+      val outputPartitioning = 
firstLeafPartitioning(physicalPlan.outputPartitioning)
+
+      Dataset.ofRows(
+        sparkSession,
+        LogicalRDD(
+          logicalPlan.output,
+          internalRdd,
+          outputPartitioning,
+          physicalPlan.outputOrdering,
+          isStreaming
+        )(sparkSession)).as[T]
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 105cec5..8c34e47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -20,15 +20,16 @@ package org.apache.spark.sql
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.sql.catalyst.ScroogeLikeExample
 import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
 import org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
 import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
+import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -1687,6 +1688,18 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
     }
     assert(thrownException.message.contains("Cannot up cast `id` from bigint 
to tinyint"))
   }
+
+  test("SPARK-26690: checkpoints should be executed with an execution id") {
+    def assertExecutionId: UserDefinedFunction = udf(AssertExecutionId.apply _)
+    spark.range(10).select(assertExecutionId($"id")).localCheckpoint(true)
+  }
+}
+
+object AssertExecutionId {
+  def apply(id: Long): Long = {
+    assert(TaskContext.get().getLocalProperty(SQLExecution.EXECUTION_ID_KEY) 
!= null)
+    id
+  }
 }
 
 case class TestDataUnion(x: Int, y: Int, z: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to