This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af6d04b  [SPARK-36095][CORE] Grouping exception in core/rdd
af6d04b is described below

commit af6d04b65ca45c0d8e1aacc93da7be271b586506
Author: dgd-contributor <dgd_contribu...@viettel.com.vn>
AuthorDate: Wed Jul 28 22:01:26 2021 +0800

    [SPARK-36095][CORE] Grouping exception in core/rdd
    
    ### What changes were proposed in this pull request?
    This PR group exception messages in core/src/main/scala/org/apache/spark/rdd
    
    ### Why are the changes needed?
    It will largely help with standardization of error messages and its 
maintenance.
    
    ### Does this PR introduce _any_ user-facing change?
    No. Error messages remain unchanged.
    
    ### How was this patch tested?
    No new tests - pass all original tests to make sure it doesn't break any 
existing behavior.
    
    Closes #33317 from dgd-contributor/SPARK-36095_GroupExceptionCoreRdd.
    
    Lead-authored-by: dgd-contributor <dgd_contribu...@viettel.com.vn>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/errors/SparkCoreErrors.scala  | 144 +++++++++++++++++++++
 .../main/scala/org/apache/spark/rdd/BlockRDD.scala |   6 +-
 .../org/apache/spark/rdd/DoubleRDDFunctions.scala  |   4 +-
 .../main/scala/org/apache/spark/rdd/EmptyRDD.scala |   3 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala     |   4 +-
 .../org/apache/spark/rdd/LocalCheckpointRDD.scala  |   9 +-
 .../scala/org/apache/spark/rdd/NewHadoopRDD.scala  |   3 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala    |  15 ++-
 .../main/scala/org/apache/spark/rdd/PipedRDD.scala |   3 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |  27 ++--
 .../apache/spark/rdd/ReliableCheckpointRDD.scala   |  17 +--
 .../spark/rdd/ReliableRDDCheckpointData.scala      |   3 +-
 12 files changed, 186 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala 
b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
new file mode 100644
index 0000000..a29f375
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.errors
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.{BlockId, RDDBlockId}
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown during 
query execution.
+ */
+object SparkCoreErrors {
+  def rddBlockNotFoundError(blockId: BlockId, id: Int): Throwable = {
+    new Exception(s"Could not compute split, block $blockId of RDD $id not 
found")
+  }
+
+  def blockHaveBeenRemovedError(string: String): Throwable = {
+    new SparkException(s"Attempted to use $string after its blocks have been 
removed!")
+  }
+
+  def histogramOnEmptyRDDOrContainingInfinityOrNaNError(): Throwable = {
+    new UnsupportedOperationException(
+      "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
+  }
+
+  def emptyRDDError(): Throwable = {
+    new UnsupportedOperationException("empty RDD")
+  }
+
+  def pathNotSupportedError(path: String): Throwable = {
+    new IOException(s"Path: ${path} is a directory, which is not supported by 
the " +
+      "record reader when 
`mapreduce.input.fileinputformat.input.dir.recursive` is false.")
+  }
+
+  def checkpointRDDBlockIdNotFoundError(rddBlockId: RDDBlockId): Throwable = {
+    new SparkException(
+      s"""
+         |Checkpoint block $rddBlockId not found! Either the executor
+         |that originally checkpointed this partition is no longer alive, or 
the original RDD is
+         |unpersisted. If this problem persists, you may consider using 
`rdd.checkpoint()`
+         |instead, which is slower than local checkpointing but more 
fault-tolerant.
+       """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def endOfStreamError(): Throwable = {
+    new java.util.NoSuchElementException("End of stream")
+  }
+
+  def cannotUseMapSideCombiningWithArrayKeyError(): Throwable = {
+    new SparkException("Cannot use map-side combining with array keys.")
+  }
+
+  def hashPartitionerCannotPartitionArrayKeyError(): Throwable = {
+    new SparkException("HashPartitioner cannot partition array keys.")
+  }
+
+  def reduceByKeyLocallyNotSupportArrayKeysError(): Throwable = {
+    new SparkException("reduceByKeyLocally() does not support array keys")
+  }
+
+  def noSuchElementException(): Throwable = {
+    new NoSuchElementException()
+  }
+
+  def rddLacksSparkContextError(): Throwable = {
+    new SparkException("This RDD lacks a SparkContext. It could happen in the 
following cases: " +
+      "\n(1) RDD transformations and actions are NOT invoked by the driver, 
but inside of other " +
+      "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is 
invalid " +
+      "because the values transformation and count action cannot be performed 
inside of the " +
+      "rdd1.map transformation. For more information, see SPARK-5063.\n(2) 
When a Spark " +
+      "Streaming job recovers from checkpoint, this exception will be hit if a 
reference to " +
+      "an RDD not defined by the streaming job is used in DStream operations. 
For more " +
+      "information, See SPARK-13758.")
+  }
+
+  def cannotChangeStorageLevelError(): Throwable = {
+    new UnsupportedOperationException(
+      "Cannot change storage level of an RDD after it was already assigned a 
level")
+  }
+
+  def canOnlyZipRDDsWithSamePartitionSizeError(): Throwable = {
+    new SparkException("Can only zip RDDs with same number of elements in each 
partition")
+  }
+
+  def emptyCollectionError(): Throwable = {
+    new UnsupportedOperationException("empty collection")
+  }
+
+  def countByValueApproxNotSupportArraysError(): Throwable = {
+    new SparkException("countByValueApprox() does not support arrays")
+  }
+
+  def checkpointDirectoryHasNotBeenSetInSparkContextError(): Throwable = {
+    new SparkException("Checkpoint directory has not been set in the 
SparkContext")
+  }
+
+  def invalidCheckpointFileError(path: Path): Throwable = {
+    new SparkException(s"Invalid checkpoint file: $path")
+  }
+
+  def failToCreateCheckpointPathError(checkpointDirPath: Path): Throwable = {
+    new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
+  }
+
+  def checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
+      originalRDDId: Int,
+      originalRDDLength: Int,
+      newRDDId: Int,
+      newRDDLength: Int): Throwable = {
+    new SparkException(
+      s"""
+         |Checkpoint RDD has a different number of partitions from original 
RDD. Original
+         |RDD [ID: $originalRDDId, num of partitions: $originalRDDLength];
+         |Checkpoint RDD [ID: $newRDDId, num of partitions: $newRDDLength].
+       """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def checkpointFailedToSaveError(task: Int, path: Path): Throwable = {
+    new IOException("Checkpoint failed: failed to save output of task: " +
+      s"$task and final output path does not exist: $path")
+  }
+
+  def mustSpecifyCheckpointDirError(): Throwable = {
+    new SparkException("Checkpoint dir must be specified.")
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index a5c3e2a..05cad3d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.storage.{BlockId, BlockManager}
 
 private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends 
Partition {
@@ -47,7 +48,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val 
blockIds: Array[Blo
     blockManager.get[T](blockId) match {
       case Some(block) => block.data.asInstanceOf[Iterator[T]]
       case None =>
-        throw new Exception(s"Could not compute split, block $blockId of RDD 
$id not found")
+        throw SparkCoreErrors.rddBlockNotFoundError(blockId, id)
     }
   }
 
@@ -79,8 +80,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val 
blockIds: Array[Blo
   /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
   private[spark] def assertValid(): Unit = {
     if (!isValid) {
-      throw new SparkException(
-        "Attempted to use %s after its blocks have been 
removed!".format(toString))
+      throw SparkCoreErrors.blockHaveBeenRemovedError(toString)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 39f6956..9c97e02 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import org.apache.spark.TaskContext
 import org.apache.spark.annotation.Since
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.MeanEvaluator
@@ -135,8 +136,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging 
with Serializable {
       (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
     }
     if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) {
-      throw new UnsupportedOperationException(
-        "Histogram on either an empty RDD or RDD containing +/-infinity or 
NaN")
+      throw SparkCoreErrors.histogramOnEmptyRDDOrContainingInfinityOrNaNError()
     }
     val range = if (min != max) {
       // Range.Double.inclusive(min, max, increment)
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index a2d7e34..8b75b3c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
 
 /**
  * An RDD that has no partitions and no elements.
@@ -29,6 +30,6 @@ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) 
extends RDD[T](sc,
   override def getPartitions: Array[Partition] = Array.empty
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    throw new UnsupportedOperationException("empty RDD")
+    throw SparkCoreErrors.emptyRDDError()
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 5fc0b4f..7011451 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
@@ -234,8 +235,7 @@ class HadoopRDD[K, V](
         Array.empty[Partition]
       case e: IOException if e.getMessage.startsWith("Not a file:") =>
         val path = e.getMessage.split(":").map(_.trim).apply(2)
-        throw new IOException(s"Path: ${path} is a directory, which is not 
supported by the " +
-          s"record reader when 
`mapreduce.input.fileinputformat.input.dir.recursive` is false.")
+        throw SparkCoreErrors.pathNotSupportedError(path)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index 113ed2d..342cf66 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -19,7 +19,8 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.storage.RDDBlockId
 
 /**
@@ -57,11 +58,7 @@ private[spark] class LocalCheckpointRDD[T: ClassTag](
    * available in the block storage.
    */
   override def compute(partition: Partition, context: TaskContext): 
Iterator[T] = {
-    throw new SparkException(
-      s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! 
Either the executor " +
-      s"that originally checkpointed this partition is no longer alive, or the 
original RDD is " +
-      s"unpersisted. If this problem persists, you may consider using 
`rdd.checkpoint()` " +
-      s"instead, which is slower than local checkpointing but more 
fault-tolerant.")
+    throw SparkCoreErrors.checkpointRDDBlockIdNotFoundError(RDDBlockId(rddId, 
partition.index))
   }
 
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a7a6cf4..7fdbe7b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.task.{JobContextImpl, 
TaskAttemptContextImpl}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
@@ -270,7 +271,7 @@ class NewHadoopRDD[K, V](
 
       override def next(): (K, V) = {
         if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
+          throw SparkCoreErrors.endOfStreamError()
         }
         havePair = false
         if (!finished) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f280c22..4dd2967 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, 
OutputFormat => NewO
 
 import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.internal.io._
@@ -76,10 +77,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     require(mergeCombiners != null, "mergeCombiners must be defined") // 
required as of Spark 0.9.0
     if (keyClass.isArray) {
       if (mapSideCombine) {
-        throw new SparkException("Cannot use map-side combining with array 
keys.")
+        throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
       }
       if (partitioner.isInstanceOf[HashPartitioner]) {
-        throw new SparkException("HashPartitioner cannot partition array 
keys.")
+        throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
       }
     }
     val aggregator = new Aggregator[K, V, C](
@@ -331,7 +332,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val cleanedF = self.sparkContext.clean(func)
 
     if (keyClass.isArray) {
-      throw new SparkException("reduceByKeyLocally() does not support array 
keys")
+      throw SparkCoreErrors.reduceByKeyLocallyNotSupportArrayKeysError()
     }
 
     val reducePartition = (iter: Iterator[(K, V)]) => {
@@ -524,7 +525,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
     if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     if (self.partitioner == Some(partitioner)) {
       self
@@ -776,7 +777,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = 
self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), 
partitioner)
     cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
@@ -794,7 +795,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
     cg.mapValues { case Array(vs, w1s) =>
@@ -809,7 +810,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: 
Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
-      throw new SparkException("HashPartitioner cannot partition array keys.")
+      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
     cg.mapValues { case Array(vs, w1s, w2s) =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 5dd8cb8..3cf0dd2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -33,6 +33,7 @@ import scala.io.Source
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.util.Utils
 
 
@@ -184,7 +185,7 @@ private[spark] class PipedRDD[T: ClassTag](
     new Iterator[String] {
       def next(): String = {
         if (!hasNext()) {
-          throw new NoSuchElementException()
+          throw SparkCoreErrors.noSuchElementException()
         }
         lines.next()
       }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index eb7bf4d..35e53b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
 import org.apache.spark.Partitioner._
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR
@@ -92,15 +93,7 @@ abstract class RDD[T: ClassTag](
 
   private def sc: SparkContext = {
     if (_sc == null) {
-      throw new SparkException(
-        "This RDD lacks a SparkContext. It could happen in the following 
cases: \n(1) RDD " +
-        "transformations and actions are NOT invoked by the driver, but inside 
of other " +
-        "transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid " +
-        "because the values transformation and count action cannot be 
performed inside of the " +
-        "rdd1.map transformation. For more information, see SPARK-5063.\n(2) 
When a Spark " +
-        "Streaming job recovers from checkpoint, this exception will be hit if 
a reference to " +
-        "an RDD not defined by the streaming job is used in DStream 
operations. For more " +
-        "information, See SPARK-13758.")
+      throw SparkCoreErrors.rddLacksSparkContextError()
     }
     _sc
   }
@@ -172,8 +165,7 @@ abstract class RDD[T: ClassTag](
   private def persist(newLevel: StorageLevel, allowOverride: Boolean): 
this.type = {
     // TODO: Handle changes of StorageLevel
     if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && 
!allowOverride) {
-      throw new UnsupportedOperationException(
-        "Cannot change storage level of an RDD after it was already assigned a 
level")
+      throw SparkCoreErrors.cannotChangeStorageLevelError()
     }
     // If this is the first time this RDD is marked for persisting, register it
     // with the SparkContext for cleanups and accounting. Do this only once.
@@ -951,8 +943,7 @@ abstract class RDD[T: ClassTag](
         def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
           case (true, true) => true
           case (false, false) => false
-          case _ => throw new SparkException("Can only zip RDDs with " +
-            "same number of elements in each partition")
+          case _ => throw 
SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()
         }
         def next(): (T, U) = (thisIter.next(), otherIter.next())
       }
@@ -1119,7 +1110,7 @@ abstract class RDD[T: ClassTag](
     }
     sc.runJob(this, reducePartition, mergeResult)
     // Get the final result out of our Option, or throw an exception if the 
RDD was empty
-    jobResult.getOrElse(throw new UnsupportedOperationException("empty 
collection"))
+    jobResult.getOrElse(throw SparkCoreErrors.emptyCollectionError())
   }
 
   /**
@@ -1151,7 +1142,7 @@ abstract class RDD[T: ClassTag](
       }
     }
     partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
-      .getOrElse(throw new UnsupportedOperationException("empty collection"))
+      .getOrElse(throw SparkCoreErrors.emptyCollectionError())
   }
 
   /**
@@ -1311,7 +1302,7 @@ abstract class RDD[T: ClassTag](
       : PartialResult[Map[T, BoundedDouble]] = withScope {
     require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) 
must be in [0,1]")
     if (elementClassTag.runtimeClass.isArray) {
-      throw new SparkException("countByValueApprox() does not support arrays")
+      throw SparkCoreErrors.countByValueApproxNotSupportArraysError()
     }
     val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { 
(_, iter) =>
       val map = new OpenHashMap[T, Long]
@@ -1462,7 +1453,7 @@ abstract class RDD[T: ClassTag](
   def first(): T = withScope {
     take(1) match {
       case Array(t) => t
-      case _ => throw new UnsupportedOperationException("empty collection")
+      case _ => throw SparkCoreErrors.emptyCollectionError()
     }
   }
 
@@ -1612,7 +1603,7 @@ abstract class RDD[T: ClassTag](
     // children RDD partitions point to the correct parent partitions. In the 
future
     // we should revisit this consideration.
     if (context.checkpointDir.isEmpty) {
-      throw new SparkException("Checkpoint directory has not been set in the 
SparkContext")
+      throw 
SparkCoreErrors.checkpointDirectoryHasNotBeenSetInSparkContextError()
     } else if (checkpointData.isEmpty) {
       checkpointData = Some(new ReliableRDDCheckpointData(this))
     }
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 5093a12..7339eb6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.{FileNotFoundException, IOException}
+import java.io.FileNotFoundException
 import java.util.concurrent.TimeUnit
 
 import scala.reflect.ClassTag
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{BUFFER_SIZE, 
CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS}
 import org.apache.spark.io.CompressionCodec
@@ -77,7 +78,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
     // Fail fast if input files are invalid
     inputFiles.zipWithIndex.foreach { case (path, i) =>
       if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
-        throw new SparkException(s"Invalid checkpoint file: $path")
+        throw SparkCoreErrors.invalidCheckpointFileError(path)
       }
     }
     Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
@@ -155,7 +156,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
     val checkpointDirPath = new Path(checkpointDir)
     val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
     if (!fs.mkdirs(checkpointDirPath)) {
-      throw new SparkException(s"Failed to create checkpoint path 
$checkpointDirPath")
+      throw SparkCoreErrors.failToCreateCheckpointPathError(checkpointDirPath)
     }
 
     // Save to file, and reload it as an RDD
@@ -176,11 +177,8 @@ private[spark] object ReliableCheckpointRDD extends 
Logging {
     val newRDD = new ReliableCheckpointRDD[T](
       sc, checkpointDirPath.toString, originalRDD.partitioner)
     if (newRDD.partitions.length != originalRDD.partitions.length) {
-      throw new SparkException(
-        "Checkpoint RDD has a different number of partitions from original 
RDD. Original " +
-          s"RDD [ID: ${originalRDD.id}, num of partitions: 
${originalRDD.partitions.length}]; " +
-          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
-          s"${newRDD.partitions.length}].")
+      throw 
SparkCoreErrors.checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError(
+        originalRDD.id, originalRDD.partitions.length, newRDD.id, 
newRDD.partitions.length)
     }
     newRDD
   }
@@ -231,8 +229,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
       if (!fs.exists(finalOutputPath)) {
         logInfo(s"Deleting tempOutputPath $tempOutputPath")
         fs.delete(tempOutputPath, false)
-        throw new IOException("Checkpoint failed: failed to save output of 
task: " +
-          s"${ctx.attemptNumber()} and final output path does not exist: 
$finalOutputPath")
+        throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), 
finalOutputPath)
       } else {
         // Some other copy of this task must've finished before us and renamed 
it
         logInfo(s"Final output path $finalOutputPath already exists; not 
overwriting it")
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 7a592ab..0a26b7b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import 
org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
 
@@ -37,7 +38,7 @@ private[spark] class ReliableRDDCheckpointData[T: 
ClassTag](@transient private v
   private val cpDir: String =
     ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)
       .map(_.toString)
-      .getOrElse { throw new SparkException("Checkpoint dir must be 
specified.") }
+      .getOrElse { throw SparkCoreErrors.mustSpecifyCheckpointDirError() }
 
   /**
    * Return the directory to which this RDD was checkpointed.

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to