This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9813b1d [SPARK-26690] Track query execution and time cost for checkpoints 9813b1d is described below commit 9813b1d0747ecc8ef8b2abaa185fb4c381271777 Author: Tom van Bussel <tom.vanbus...@databricks.com> AuthorDate: Thu Jan 24 16:44:39 2019 +0100 [SPARK-26690] Track query execution and time cost for checkpoints ## What changes were proposed in this pull request? Checkpoints of Dataframes currently do not show up in SQL UI. This PR fixes that by setting an execution id for the execution of the checkpoint by wrapping the checkpoint code with a `withAction`. ## How was this patch tested? A unit test was added to DatasetSuite. Closes #23636 from tomvanbussel/SPARK-26690. Authored-by: Tom van Bussel <tom.vanbus...@databricks.com> Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 61 +++++++++++----------- .../scala/org/apache/spark/sql/DatasetSuite.scala | 17 +++++- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 44cada0..32f6234 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -619,40 +619,41 @@ class Dataset[T] private[sql]( * the caching subsystem */ private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = { - val internalRdd = queryExecution.toRdd.map(_.copy()) - if (reliableCheckpoint) { - internalRdd.checkpoint() - } else { - internalRdd.localCheckpoint() - } - - if (eager) { - internalRdd.count() - } - - val physicalPlan = queryExecution.executedPlan + val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint" + withAction(actionName, queryExecution) { physicalPlan => + val internalRdd = physicalPlan.execute().map(_.copy()) + if (reliableCheckpoint) { + internalRdd.checkpoint() + } else { + internalRdd.localCheckpoint() + } - // Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the - // size of `PartitioningCollection` may grow exponentially for queries involving deep inner - // joins. - def firstLeafPartitioning(partitioning: Partitioning): Partitioning = { - partitioning match { - case p: PartitioningCollection => firstLeafPartitioning(p.partitionings.head) - case p => p + if (eager) { + internalRdd.count() } - } - val outputPartitioning = firstLeafPartitioning(physicalPlan.outputPartitioning) + // Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the + // size of `PartitioningCollection` may grow exponentially for queries involving deep inner + // joins. + def firstLeafPartitioning(partitioning: Partitioning): Partitioning = { + partitioning match { + case p: PartitioningCollection => firstLeafPartitioning(p.partitionings.head) + case p => p + } + } - Dataset.ofRows( - sparkSession, - LogicalRDD( - logicalPlan.output, - internalRdd, - outputPartitioning, - physicalPlan.outputOrdering, - isStreaming - )(sparkSession)).as[T] + val outputPartitioning = firstLeafPartitioning(physicalPlan.outputPartitioning) + + Dataset.ofRows( + sparkSession, + LogicalRDD( + logicalPlan.output, + internalRdd, + outputPartitioning, + physicalPlan.outputOrdering, + isStreaming + )(sparkSession)).as[T] + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 105cec5..8c34e47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -20,15 +20,16 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.sql.catalyst.ScroogeLikeExample import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject import org.apache.spark.sql.catalyst.util.sideBySide -import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} +import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -1687,6 +1688,18 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } assert(thrownException.message.contains("Cannot up cast `id` from bigint to tinyint")) } + + test("SPARK-26690: checkpoints should be executed with an execution id") { + def assertExecutionId: UserDefinedFunction = udf(AssertExecutionId.apply _) + spark.range(10).select(assertExecutionId($"id")).localCheckpoint(true) + } +} + +object AssertExecutionId { + def apply(id: Long): Long = { + assert(TaskContext.get().getLocalProperty(SQLExecution.EXECUTION_ID_KEY) != null) + id + } } case class TestDataUnion(x: Int, y: Int, z: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org