Repository: spark
Updated Branches:
  refs/heads/master 3aa60282c -> 061bb01d9


[SPARK-25248][CORE] Audit barrier Scala APIs for 2.4

## What changes were proposed in this pull request?

I made one pass over barrier APIs added to Spark 2.4 and updates some scopes 
and docs. I will update Python docs once Scala doc was reviewed.

One major issue is that `BarrierTaskContext` implements `TaskContextImpl` that 
exposes some public methods. And internally there were several direct 
references to `TaskContextImpl` methods instead of `TaskContext`. This PR moved 
some methods from `TaskContextImpl` to `TaskContext`, remaining package 
private, and used delegate methods to avoid inheriting `TaskContextImp` and 
exposing unnecessary APIs.

TODOs:
- [x] scala doc
- [x] python doc (#22261 ).

Closes #22240 from mengxr/SPARK-25248.

Authored-by: Xiangrui Meng <m...@databricks.com>
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/061bb01d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/061bb01d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/061bb01d

Branch: refs/heads/master
Commit: 061bb01d9b99911353e66a90abc3164c467fcae1
Parents: 3aa6028
Author: Xiangrui Meng <m...@databricks.com>
Authored: Tue Sep 4 09:55:53 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Tue Sep 4 09:55:53 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/BarrierTaskContext.scala   | 114 +++++++++++++++----
 .../org/apache/spark/BarrierTaskInfo.scala      |   2 +-
 .../scala/org/apache/spark/TaskContext.scala    |  14 +++
 .../org/apache/spark/TaskContextImpl.scala      |  15 +--
 .../apache/spark/api/python/PythonRunner.scala  |   2 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  10 +-
 .../scala/org/apache/spark/rdd/RDDBarrier.scala |  22 ++--
 .../scala/org/apache/spark/scheduler/Task.scala |  35 +++---
 .../scala/org/apache/spark/util/Utils.scala     |   2 +-
 project/MimaExcludes.scala                      |   7 ++
 .../spark/sql/internal/ReadOnlySQLConf.scala    |   4 +-
 11 files changed, 163 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 3901f96..90a5c41 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -24,25 +24,22 @@ import scala.language.postfixOps
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
 import org.apache.spark.memory.TaskMemoryManager
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
 import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout}
-import org.apache.spark.util.{RpcUtils, Utils}
-
-/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
-class BarrierTaskContext(
-    override val stageId: Int,
-    override val stageAttemptNumber: Int,
-    override val partitionId: Int,
-    override val taskAttemptId: Long,
-    override val attemptNumber: Int,
-    override val taskMemoryManager: TaskMemoryManager,
-    localProperties: Properties,
-    @transient private val metricsSystem: MetricsSystem,
-    // The default value is only used in tests.
-    override val taskMetrics: TaskMetrics = TaskMetrics.empty)
-  extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, 
taskAttemptId, attemptNumber,
-      taskMemoryManager, localProperties, metricsSystem, taskMetrics) {
+import org.apache.spark.shuffle.FetchFailedException
+import org.apache.spark.util._
+
+/**
+ * :: Experimental ::
+ * A [[TaskContext]] with extra contextual info and tooling for tasks in a 
barrier stage.
+ * Use [[BarrierTaskContext#get]] to obtain the barrier context for a running 
barrier task.
+ */
+@Experimental
+@Since("2.4.0")
+class BarrierTaskContext private[spark] (
+    taskContext: TaskContext) extends TaskContext with Logging {
 
   // Find the driver side RPCEndpointRef of the coordinator that handles all 
the barrier() calls.
   private val barrierCoordinator: RpcEndpointRef = {
@@ -68,7 +65,7 @@ class BarrierTaskContext(
    *
    * CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
    * possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
-   * timeout. Some examples of misuses listed below:
+   * timeout. Some examples of '''misuses''' are listed below:
    * 1. Only call barrier() function on a subset of all the tasks in the same 
barrier stage, it
    * shall lead to timeout of the function call.
    * {{{
@@ -146,20 +143,95 @@ class BarrierTaskContext(
 
   /**
    * :: Experimental ::
-   * Returns the all task infos in this barrier stage, the task infos are 
ordered by partitionId.
+   * Returns [[BarrierTaskInfo]] for all tasks in this barrier stage, ordered 
by partition ID.
    */
   @Experimental
   @Since("2.4.0")
   def getTaskInfos(): Array[BarrierTaskInfo] = {
-    val addressesStr = localProperties.getProperty("addresses", "")
+    val addressesStr = 
Option(taskContext.getLocalProperty("addresses")).getOrElse("")
     addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
   }
+
+  // delegate methods
+
+  override def isCompleted(): Boolean = taskContext.isCompleted()
+
+  override def isInterrupted(): Boolean = taskContext.isInterrupted()
+
+  override def isRunningLocally(): Boolean = taskContext.isRunningLocally()
+
+  override def addTaskCompletionListener(listener: TaskCompletionListener): 
this.type = {
+    taskContext.addTaskCompletionListener(listener)
+    this
+  }
+
+  override def addTaskFailureListener(listener: TaskFailureListener): 
this.type = {
+    taskContext.addTaskFailureListener(listener)
+    this
+  }
+
+  override def stageId(): Int = taskContext.stageId()
+
+  override def stageAttemptNumber(): Int = taskContext.stageAttemptNumber()
+
+  override def partitionId(): Int = taskContext.partitionId()
+
+  override def attemptNumber(): Int = taskContext.attemptNumber()
+
+  override def taskAttemptId(): Long = taskContext.taskAttemptId()
+
+  override def getLocalProperty(key: String): String = 
taskContext.getLocalProperty(key)
+
+  override def taskMetrics(): TaskMetrics = taskContext.taskMetrics()
+
+  override def getMetricsSources(sourceName: String): Seq[Source] = {
+    taskContext.getMetricsSources(sourceName)
+  }
+
+  override private[spark] def killTaskIfInterrupted(): Unit = 
taskContext.killTaskIfInterrupted()
+
+  override private[spark] def getKillReason(): Option[String] = 
taskContext.getKillReason()
+
+  override private[spark] def taskMemoryManager(): TaskMemoryManager = {
+    taskContext.taskMemoryManager()
+  }
+
+  override private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): 
Unit = {
+    taskContext.registerAccumulator(a)
+  }
+
+  override private[spark] def setFetchFailed(fetchFailed: 
FetchFailedException): Unit = {
+    taskContext.setFetchFailed(fetchFailed)
+  }
+
+  override private[spark] def markInterrupted(reason: String): Unit = {
+    taskContext.markInterrupted(reason)
+  }
+
+  override private[spark] def markTaskFailed(error: Throwable): Unit = {
+    taskContext.markTaskFailed(error)
+  }
+
+  override private[spark] def markTaskCompleted(error: Option[Throwable]): 
Unit = {
+    taskContext.markTaskCompleted(error)
+  }
+
+  override private[spark] def fetchFailed: Option[FetchFailedException] = {
+    taskContext.fetchFailed
+  }
+
+  override private[spark] def getLocalProperties: Properties = 
taskContext.getLocalProperties
 }
 
+@Experimental
+@Since("2.4.0")
 object BarrierTaskContext {
   /**
-   * Return the currently active BarrierTaskContext. This can be called inside 
of user functions to
+   * :: Experimental ::
+   * Returns the currently active BarrierTaskContext. This can be called 
inside of user functions to
    * access contextual information about running barrier tasks.
    */
+  @Experimental
+  @Since("2.4.0")
   def get(): BarrierTaskContext = 
TaskContext.get().asInstanceOf[BarrierTaskContext]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
index ce2653d..347239b 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
@@ -28,4 +28,4 @@ import org.apache.spark.annotation.{Experimental, Since}
  */
 @Experimental
 @Since("2.4.0")
-class BarrierTaskInfo(val address: String)
+class BarrierTaskInfo private[spark] (val address: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index ceadf10..2b939da 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -221,4 +221,18 @@ abstract class TaskContext extends Serializable {
    */
   private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit
 
+  /** Marks the task for interruption, i.e. cancellation. */
+  private[spark] def markInterrupted(reason: String): Unit
+
+  /** Marks the task as failed and triggers the failure listeners. */
+  private[spark] def markTaskFailed(error: Throwable): Unit
+
+  /** Marks the task as completed and triggers the completion listeners. */
+  private[spark] def markTaskCompleted(error: Option[Throwable]): Unit
+
+  /** Optionally returns the stored fetch failure in the task. */
+  private[spark] def fetchFailed: Option[FetchFailedException]
+
+  /** Gets local properties set upstream in the driver. */
+  private[spark] def getLocalProperties: Properties
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 0791fe8..8973042 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -30,6 +30,7 @@ import org.apache.spark.metrics.source.Source
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util._
 
+
 /**
  * A [[TaskContext]] implementation.
  *
@@ -98,9 +99,8 @@ private[spark] class TaskContextImpl(
     this
   }
 
-  /** Marks the task as failed and triggers the failure listeners. */
   @GuardedBy("this")
-  private[spark] def markTaskFailed(error: Throwable): Unit = synchronized {
+  private[spark] override def markTaskFailed(error: Throwable): Unit = 
synchronized {
     if (failed) return
     failed = true
     failure = error
@@ -109,9 +109,8 @@ private[spark] class TaskContextImpl(
     }
   }
 
-  /** Marks the task as completed and triggers the completion listeners. */
   @GuardedBy("this")
-  private[spark] def markTaskCompleted(error: Option[Throwable]): Unit = 
synchronized {
+  private[spark] override def markTaskCompleted(error: Option[Throwable]): 
Unit = synchronized {
     if (completed) return
     completed = true
     invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
@@ -140,8 +139,7 @@ private[spark] class TaskContextImpl(
     }
   }
 
-  /** Marks the task for interruption, i.e. cancellation. */
-  private[spark] def markInterrupted(reason: String): Unit = {
+  private[spark] override def markInterrupted(reason: String): Unit = {
     reasonIfKilled = Some(reason)
   }
 
@@ -176,8 +174,7 @@ private[spark] class TaskContextImpl(
     this._fetchFailedException = Option(fetchFailed)
   }
 
-  private[spark] def fetchFailed: Option[FetchFailedException] = 
_fetchFailedException
+  private[spark] override def fetchFailed: Option[FetchFailedException] = 
_fetchFailedException
 
-  // TODO: shall we publish it and define it in `TaskContext`?
-  private[spark] def getLocalProperties(): Properties = localProperties
+  private[spark] override def getLocalProperties(): Properties = 
localProperties
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 6c7e863..4c53bc2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -270,7 +270,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         dataOut.writeInt(context.partitionId())
         dataOut.writeInt(context.attemptNumber())
         dataOut.writeLong(context.taskAttemptId())
-        val localProps = 
context.asInstanceOf[TaskContextImpl].getLocalProperties.asScala
+        val localProps = context.getLocalProperties.asScala
         dataOut.writeInt(localProps.size)
         localProps.foreach { case (k, v) =>
           PythonRDD.writeUTF(k, dataOut)

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 374b846..ea895bb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1649,7 +1649,15 @@ abstract class RDD[T: ClassTag](
 
   /**
    * :: Experimental ::
-   * Indicates that Spark must launch the tasks together for the current stage.
+   * Marks the current stage as a barrier stage, where Spark must launch all 
tasks together.
+   * In case of a task failure, instead of only restarting the failed task, 
Spark will abort the
+   * entire stage and re-launch all tasks for this stage.
+   * The barrier execution mode feature is experimental and it only handles 
limited scenarios.
+   * Please read the linked SPIP and design docs to understand the limitations 
and future plans.
+   * @return an [[RDDBarrier]] instance that provides actions within a barrier 
stage
+   * @see [[org.apache.spark.BarrierTaskContext]]
+   * @see <a href="https://jira.apache.org/jira/browse/SPARK-24374";>SPIP: 
Barrier Execution Mode</a>
+   * @see <a href="https://jira.apache.org/jira/browse/SPARK-24582";>Design 
Doc</a>
    */
   @Experimental
   @Since("2.4.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
index b399bf9..42802f7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
@@ -22,15 +22,23 @@ import scala.reflect.ClassTag
 import org.apache.spark.TaskContext
 import org.apache.spark.annotation.{Experimental, Since}
 
-/** Represents an RDD barrier, which forces Spark to launch tasks of this 
stage together. */
-class RDDBarrier[T: ClassTag](rdd: RDD[T]) {
+/**
+ * :: Experimental ::
+ * Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this 
stage together.
+ * [[org.apache.spark.rdd.RDDBarrier]] instances are created by
+ * [[org.apache.spark.rdd.RDD#barrier]].
+ */
+@Experimental
+@Since("2.4.0")
+class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
 
   /**
    * :: Experimental ::
-   * Generate a new barrier RDD by applying a function to each partitions of 
the prev RDD.
-   *
-   * `preservesPartitioning` indicates whether the input function preserves 
the partitioner, which
-   * should be `false` unless `rdd` is a pair RDD and the input function 
doesn't modify the keys.
+   * Returns a new RDD by applying a function to each partition of the wrapped 
RDD,
+   * where tasks are launched together in a barrier stage.
+   * The interface is the same as [[org.apache.spark.rdd.RDD#mapPartitions]].
+   * Please see the API doc there.
+   * @see [[org.apache.spark.BarrierTaskContext]]
    */
   @Experimental
   @Since("2.4.0")
@@ -46,5 +54,5 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) {
     )
   }
 
-  /** TODO extra conf(e.g. timeout) */
+  // TODO: [SPARK-25247] add extra conf to RDDBarrier, e.g., timeout.
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 11f85fd..eb059f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -82,28 +82,21 @@ private[spark] abstract class Task[T](
     SparkEnv.get.blockManager.registerTask(taskAttemptId)
     // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, 
instead of whether
     // the stage is barrier.
+    val taskContext = new TaskContextImpl(
+      stageId,
+      stageAttemptId, // stageAttemptId and stageAttemptNumber are 
semantically equal
+      partitionId,
+      taskAttemptId,
+      attemptNumber,
+      taskMemoryManager,
+      localProperties,
+      metricsSystem,
+      metrics)
+
     context = if (isBarrier) {
-      new BarrierTaskContext(
-        stageId,
-        stageAttemptId, // stageAttemptId and stageAttemptNumber are 
semantically equal
-        partitionId,
-        taskAttemptId,
-        attemptNumber,
-        taskMemoryManager,
-        localProperties,
-        metricsSystem,
-        metrics)
+      new BarrierTaskContext(taskContext)
     } else {
-      new TaskContextImpl(
-        stageId,
-        stageAttemptId, // stageAttemptId and stageAttemptNumber are 
semantically equal
-        partitionId,
-        taskAttemptId,
-        attemptNumber,
-        taskMemoryManager,
-        localProperties,
-        metricsSystem,
-        metrics)
+      taskContext
     }
 
     TaskContext.setTaskContext(context)
@@ -180,7 +173,7 @@ private[spark] abstract class Task[T](
   var epoch: Long = -1
 
   // Task context, to be initialized in run().
-  @transient var context: TaskContextImpl = _
+  @transient var context: TaskContext = _
 
   // The actual Thread on which the task is running, if any. Initialized in 
run().
   @volatile @transient private var taskThread: Thread = _

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e6646bd..935bff9 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1387,7 +1387,7 @@ private[spark] object Utils extends Logging {
         originalThrowable = cause
         try {
           logError("Aborting task", originalThrowable)
-          
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
+          TaskContext.get().markTaskFailed(originalThrowable)
           catchBlock
         } catch {
           case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 62f8b1a..45cc5cc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,13 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+    // [SPARK-25248] add package private methods to TaskContext
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.markTaskFailed"),
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.markInterrupted"),
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.fetchFailed"),
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.markTaskCompleted"),
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperties"),
+
     // [SPARK-10697][ML] Add lift to Association rules
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"),

http://git-wip-us.apache.org/repos/asf/spark/blob/061bb01d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
index 19f6723..ef4b339 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.internal
 
 import java.util.{Map => JMap}
 
-import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, 
ConfigReader}
 
 /**
@@ -29,7 +29,7 @@ import org.apache.spark.internal.config.{ConfigEntry, 
ConfigProvider, ConfigRead
 class ReadOnlySQLConf(context: TaskContext) extends SQLConf {
 
   @transient override val settings: JMap[String, String] = {
-    
context.asInstanceOf[TaskContextImpl].getLocalProperties().asInstanceOf[JMap[String,
 String]]
+    context.getLocalProperties.asInstanceOf[JMap[String, String]]
   }
 
   @transient override protected val reader: ConfigReader = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to