This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new af6d04b [SPARK-36095][CORE] Grouping exception in core/rdd af6d04b is described below commit af6d04b65ca45c0d8e1aacc93da7be271b586506 Author: dgd-contributor <dgd_contribu...@viettel.com.vn> AuthorDate: Wed Jul 28 22:01:26 2021 +0800 [SPARK-36095][CORE] Grouping exception in core/rdd ### What changes were proposed in this pull request? This PR group exception messages in core/src/main/scala/org/apache/spark/rdd ### Why are the changes needed? It will largely help with standardization of error messages and its maintenance. ### Does this PR introduce _any_ user-facing change? No. Error messages remain unchanged. ### How was this patch tested? No new tests - pass all original tests to make sure it doesn't break any existing behavior. Closes #33317 from dgd-contributor/SPARK-36095_GroupExceptionCoreRdd. Lead-authored-by: dgd-contributor <dgd_contribu...@viettel.com.vn> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/errors/SparkCoreErrors.scala | 144 +++++++++++++++++++++ .../main/scala/org/apache/spark/rdd/BlockRDD.scala | 6 +- .../org/apache/spark/rdd/DoubleRDDFunctions.scala | 4 +- .../main/scala/org/apache/spark/rdd/EmptyRDD.scala | 3 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 4 +- .../org/apache/spark/rdd/LocalCheckpointRDD.scala | 9 +- .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 15 ++- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 3 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 27 ++-- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 17 +-- .../spark/rdd/ReliableRDDCheckpointData.scala | 3 +- 12 files changed, 186 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala new file mode 100644 index 0000000..a29f375 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.errors + +import java.io.IOException + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.storage.{BlockId, RDDBlockId} + +/** + * Object for grouping error messages from (most) exceptions thrown during query execution. + */ +object SparkCoreErrors { + def rddBlockNotFoundError(blockId: BlockId, id: Int): Throwable = { + new Exception(s"Could not compute split, block $blockId of RDD $id not found") + } + + def blockHaveBeenRemovedError(string: String): Throwable = { + new SparkException(s"Attempted to use $string after its blocks have been removed!") + } + + def histogramOnEmptyRDDOrContainingInfinityOrNaNError(): Throwable = { + new UnsupportedOperationException( + "Histogram on either an empty RDD or RDD containing +/-infinity or NaN") + } + + def emptyRDDError(): Throwable = { + new UnsupportedOperationException("empty RDD") + } + + def pathNotSupportedError(path: String): Throwable = { + new IOException(s"Path: ${path} is a directory, which is not supported by the " + + "record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.") + } + + def checkpointRDDBlockIdNotFoundError(rddBlockId: RDDBlockId): Throwable = { + new SparkException( + s""" + |Checkpoint block $rddBlockId not found! Either the executor + |that originally checkpointed this partition is no longer alive, or the original RDD is + |unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` + |instead, which is slower than local checkpointing but more fault-tolerant. + """.stripMargin.replaceAll("\n", " ")) + } + + def endOfStreamError(): Throwable = { + new java.util.NoSuchElementException("End of stream") + } + + def cannotUseMapSideCombiningWithArrayKeyError(): Throwable = { + new SparkException("Cannot use map-side combining with array keys.") + } + + def hashPartitionerCannotPartitionArrayKeyError(): Throwable = { + new SparkException("HashPartitioner cannot partition array keys.") + } + + def reduceByKeyLocallyNotSupportArrayKeysError(): Throwable = { + new SparkException("reduceByKeyLocally() does not support array keys") + } + + def noSuchElementException(): Throwable = { + new NoSuchElementException() + } + + def rddLacksSparkContextError(): Throwable = { + new SparkException("This RDD lacks a SparkContext. It could happen in the following cases: " + + "\n(1) RDD transformations and actions are NOT invoked by the driver, but inside of other " + + "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " + + "because the values transformation and count action cannot be performed inside of the " + + "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " + + "Streaming job recovers from checkpoint, this exception will be hit if a reference to " + + "an RDD not defined by the streaming job is used in DStream operations. For more " + + "information, See SPARK-13758.") + } + + def cannotChangeStorageLevelError(): Throwable = { + new UnsupportedOperationException( + "Cannot change storage level of an RDD after it was already assigned a level") + } + + def canOnlyZipRDDsWithSamePartitionSizeError(): Throwable = { + new SparkException("Can only zip RDDs with same number of elements in each partition") + } + + def emptyCollectionError(): Throwable = { + new UnsupportedOperationException("empty collection") + } + + def countByValueApproxNotSupportArraysError(): Throwable = { + new SparkException("countByValueApprox() does not support arrays") + } + + def checkpointDirectoryHasNotBeenSetInSparkContextError(): Throwable = { + new SparkException("Checkpoint directory has not been set in the SparkContext") + } + + def invalidCheckpointFileError(path: Path): Throwable = { + new SparkException(s"Invalid checkpoint file: $path") + } + + def failToCreateCheckpointPathError(checkpointDirPath: Path): Throwable = { + new SparkException(s"Failed to create checkpoint path $checkpointDirPath") + } + + def checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError( + originalRDDId: Int, + originalRDDLength: Int, + newRDDId: Int, + newRDDLength: Int): Throwable = { + new SparkException( + s""" + |Checkpoint RDD has a different number of partitions from original RDD. Original + |RDD [ID: $originalRDDId, num of partitions: $originalRDDLength]; + |Checkpoint RDD [ID: $newRDDId, num of partitions: $newRDDLength]. + """.stripMargin.replaceAll("\n", " ")) + } + + def checkpointFailedToSaveError(task: Int, path: Path): Throwable = { + new IOException("Checkpoint failed: failed to save output of task: " + + s"$task and final output path does not exist: $path") + } + + def mustSpecifyCheckpointDirError(): Throwable = { + new SparkException("Checkpoint dir must be specified.") + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index a5c3e2a..05cad3d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.storage.{BlockId, BlockManager} private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { @@ -47,7 +48,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo blockManager.get[T](blockId) match { case Some(block) => block.data.asInstanceOf[Iterator[T]] case None => - throw new Exception(s"Could not compute split, block $blockId of RDD $id not found") + throw SparkCoreErrors.rddBlockNotFoundError(blockId, id) } } @@ -79,8 +80,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo /** Check if this BlockRDD is valid. If not valid, exception is thrown. */ private[spark] def assertValid(): Unit = { if (!isValid) { - throw new SparkException( - "Attempted to use %s after its blocks have been removed!".format(toString)) + throw SparkCoreErrors.blockHaveBeenRemovedError(toString) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 39f6956..9c97e02 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -19,6 +19,7 @@ package org.apache.spark.rdd import org.apache.spark.TaskContext import org.apache.spark.annotation.Since +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.MeanEvaluator @@ -135,8 +136,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2)) } if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) { - throw new UnsupportedOperationException( - "Histogram on either an empty RDD or RDD containing +/-infinity or NaN") + throw SparkCoreErrors.histogramOnEmptyRDDOrContainingInfinityOrNaNError() } val range = if (min != max) { // Range.Double.inclusive(min, max, increment) diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala index a2d7e34..8b75b3c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.errors.SparkCoreErrors /** * An RDD that has no partitions and no elements. @@ -29,6 +30,6 @@ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, override def getPartitions: Array[Partition] = Array.empty override def compute(split: Partition, context: TaskContext): Iterator[T] = { - throw new UnsupportedOperationException("empty RDD") + throw SparkCoreErrors.emptyRDDError() } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 5fc0b4f..7011451 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -36,6 +36,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD @@ -234,8 +235,7 @@ class HadoopRDD[K, V]( Array.empty[Partition] case e: IOException if e.getMessage.startsWith("Not a file:") => val path = e.getMessage.split(":").map(_.trim).apply(2) - throw new IOException(s"Path: ${path} is a directory, which is not supported by the " + - s"record reader when `mapreduce.input.fileinputformat.input.dir.recursive` is false.") + throw SparkCoreErrors.pathNotSupportedError(path) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala index 113ed2d..342cf66 100644 --- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala @@ -19,7 +19,8 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.storage.RDDBlockId /** @@ -57,11 +58,7 @@ private[spark] class LocalCheckpointRDD[T: ClassTag]( * available in the block storage. */ override def compute(partition: Partition, context: TaskContext): Iterator[T] = { - throw new SparkException( - s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! Either the executor " + - s"that originally checkpointed this partition is no longer alive, or the original RDD is " + - s"unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` " + - s"instead, which is slower than local checkpointing but more fault-tolerant.") + throw SparkCoreErrors.checkpointRDDBlockIdNotFoundError(RDDBlockId(rddId, partition.index)) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index a7a6cf4..7fdbe7b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD @@ -270,7 +271,7 @@ class NewHadoopRDD[K, V]( override def next(): (K, V) = { if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") + throw SparkCoreErrors.endOfStreamError() } havePair = false if (!finished) { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f280c22..4dd2967 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewO import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.config.SPECULATION_ENABLED import org.apache.spark.internal.io._ @@ -76,10 +77,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { - throw new SparkException("Cannot use map-side combining with array keys.") + throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError() } if (partitioner.isInstanceOf[HashPartitioner]) { - throw new SparkException("HashPartitioner cannot partition array keys.") + throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError() } } val aggregator = new Aggregator[K, V, C]( @@ -331,7 +332,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val cleanedF = self.sparkContext.clean(func) if (keyClass.isArray) { - throw new SparkException("reduceByKeyLocally() does not support array keys") + throw SparkCoreErrors.reduceByKeyLocallyNotSupportArrayKeysError() } val reducePartition = (iter: Iterator[(K, V)]) => { @@ -524,7 +525,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { - throw new SparkException("HashPartitioner cannot partition array keys.") + throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError() } if (self.partitioner == Some(partitioner)) { self @@ -776,7 +777,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { - throw new SparkException("HashPartitioner cannot partition array keys.") + throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError() } val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner) cg.mapValues { case Array(vs, w1s, w2s, w3s) => @@ -794,7 +795,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { - throw new SparkException("HashPartitioner cannot partition array keys.") + throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError() } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => @@ -809,7 +810,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { - throw new SparkException("HashPartitioner cannot partition array keys.") + throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError() } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Array(vs, w1s, w2s) => diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 5dd8cb8..3cf0dd2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -33,6 +33,7 @@ import scala.io.Source import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.util.Utils @@ -184,7 +185,7 @@ private[spark] class PipedRDD[T: ClassTag]( new Iterator[String] { def next(): String = { if (!hasNext()) { - throw new NoSuchElementException() + throw SparkCoreErrors.noSuchElementException() } lines.next() } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index eb7bf4d..35e53b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -36,6 +36,7 @@ import org.apache.spark._ import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.api.java.JavaRDD +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR @@ -92,15 +93,7 @@ abstract class RDD[T: ClassTag]( private def sc: SparkContext = { if (_sc == null) { - throw new SparkException( - "This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " + - "transformations and actions are NOT invoked by the driver, but inside of other " + - "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " + - "because the values transformation and count action cannot be performed inside of the " + - "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " + - "Streaming job recovers from checkpoint, this exception will be hit if a reference to " + - "an RDD not defined by the streaming job is used in DStream operations. For more " + - "information, See SPARK-13758.") + throw SparkCoreErrors.rddLacksSparkContextError() } _sc } @@ -172,8 +165,7 @@ abstract class RDD[T: ClassTag]( private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { - throw new UnsupportedOperationException( - "Cannot change storage level of an RDD after it was already assigned a level") + throw SparkCoreErrors.cannotChangeStorageLevelError() } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. @@ -951,8 +943,7 @@ abstract class RDD[T: ClassTag]( def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true case (false, false) => false - case _ => throw new SparkException("Can only zip RDDs with " + - "same number of elements in each partition") + case _ => throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError() } def next(): (T, U) = (thisIter.next(), otherIter.next()) } @@ -1119,7 +1110,7 @@ abstract class RDD[T: ClassTag]( } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty - jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) + jobResult.getOrElse(throw SparkCoreErrors.emptyCollectionError()) } /** @@ -1151,7 +1142,7 @@ abstract class RDD[T: ClassTag]( } } partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) - .getOrElse(throw new UnsupportedOperationException("empty collection")) + .getOrElse(throw SparkCoreErrors.emptyCollectionError()) } /** @@ -1311,7 +1302,7 @@ abstract class RDD[T: ClassTag]( : PartialResult[Map[T, BoundedDouble]] = withScope { require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]") if (elementClassTag.runtimeClass.isArray) { - throw new SparkException("countByValueApprox() does not support arrays") + throw SparkCoreErrors.countByValueApproxNotSupportArraysError() } val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) => val map = new OpenHashMap[T, Long] @@ -1462,7 +1453,7 @@ abstract class RDD[T: ClassTag]( def first(): T = withScope { take(1) match { case Array(t) => t - case _ => throw new UnsupportedOperationException("empty collection") + case _ => throw SparkCoreErrors.emptyCollectionError() } } @@ -1612,7 +1603,7 @@ abstract class RDD[T: ClassTag]( // children RDD partitions point to the correct parent partitions. In the future // we should revisit this consideration. if (context.checkpointDir.isEmpty) { - throw new SparkException("Checkpoint directory has not been set in the SparkContext") + throw SparkCoreErrors.checkpointDirectoryHasNotBeenSetInSparkContextError() } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 5093a12..7339eb6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.io.{FileNotFoundException, IOException} +import java.io.FileNotFoundException import java.util.concurrent.TimeUnit import scala.reflect.ClassTag @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS} import org.apache.spark.io.CompressionCodec @@ -77,7 +78,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( // Fail fast if input files are invalid inputFiles.zipWithIndex.foreach { case (path, i) => if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) { - throw new SparkException(s"Invalid checkpoint file: $path") + throw SparkCoreErrors.invalidCheckpointFileError(path) } } Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i)) @@ -155,7 +156,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { val checkpointDirPath = new Path(checkpointDir) val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) if (!fs.mkdirs(checkpointDirPath)) { - throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") + throw SparkCoreErrors.failToCreateCheckpointPathError(checkpointDirPath) } // Save to file, and reload it as an RDD @@ -176,11 +177,8 @@ private[spark] object ReliableCheckpointRDD extends Logging { val newRDD = new ReliableCheckpointRDD[T]( sc, checkpointDirPath.toString, originalRDD.partitioner) if (newRDD.partitions.length != originalRDD.partitions.length) { - throw new SparkException( - "Checkpoint RDD has a different number of partitions from original RDD. Original " + - s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " + - s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " + - s"${newRDD.partitions.length}].") + throw SparkCoreErrors.checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError( + originalRDD.id, originalRDD.partitions.length, newRDD.id, newRDD.partitions.length) } newRDD } @@ -231,8 +229,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { if (!fs.exists(finalOutputPath)) { logInfo(s"Deleting tempOutputPath $tempOutputPath") fs.delete(tempOutputPath, false) - throw new IOException("Checkpoint failed: failed to save output of task: " + - s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath") + throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath) } else { // Some other copy of this task must've finished before us and renamed it logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 7a592ab..0a26b7b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -22,6 +22,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.fs.Path import org.apache.spark._ +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS @@ -37,7 +38,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v private val cpDir: String = ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id) .map(_.toString) - .getOrElse { throw new SparkException("Checkpoint dir must be specified.") } + .getOrElse { throw SparkCoreErrors.mustSpecifyCheckpointDirError() } /** * Return the directory to which this RDD was checkpointed. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org