This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c2bd7bac76a [SPARK-39165][SQL][3.3] Replace `sys.error` by 
`IllegalStateException`
c2bd7bac76a is described below

commit c2bd7bac76a5cf7ffc5ef61a1df2b8bb5a72f131
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri May 13 12:47:53 2022 +0300

    [SPARK-39165][SQL][3.3] Replace `sys.error` by `IllegalStateException`
    
    ### What changes were proposed in this pull request?
    Replace all invokes of `sys.error()` by throwing of `IllegalStateException` 
in the `sql` namespace.
    
    This is a backport of https://github.com/apache/spark/pull/36524.
    
    ### Why are the changes needed?
    In the context of wrapping all internal errors like asserts/illegal state 
exceptions (see https://github.com/apache/spark/pull/36500), it is impossible 
to distinguish `RuntimeException` of `sys.error()` from Spark's exceptions like 
`SparkRuntimeException`. The last one can be propagated to the user space but 
`sys.error` exceptions shouldn't be visible to users in regular cases.
    
    ### Does this PR introduce _any_ user-facing change?
    No, shouldn't. sys.error shouldn't propagate exception to user space in 
regular cases.
    
    ### How was this patch tested?
    By running the existing test suites.
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    (cherry picked from commit 95c7efd7571464d8adfb76fb22e47a5816cf73fb)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #36532 from MaxGekk/sys_error-internal-3.3.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../scala/org/apache/spark/sql/execution/SparkStrategies.scala    | 4 ++--
 .../org/apache/spark/sql/execution/datasources/DataSource.scala   | 8 ++++----
 .../sql/execution/datasources/parquet/ParquetWriteSupport.scala   | 3 +--
 .../apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++--
 .../org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala | 5 +++--
 .../scala/org/apache/spark/sql/execution/streaming/memory.scala   | 3 ++-
 .../execution/streaming/sources/TextSocketMicroBatchStream.scala  | 3 ++-
 .../src/main/scala/org/apache/spark/sql/execution/subquery.scala  | 3 ++-
 .../apache/spark/sql/execution/window/AggregateProcessor.scala    | 2 +-
 .../org/apache/spark/sql/execution/window/WindowExecBase.scala    | 8 ++++----
 .../src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 3 ++-
 .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala   | 2 +-
 12 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3b8a70ffe94..17f3cfbda89 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -503,8 +503,8 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           
_.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) {
           // This is a sanity check. We should not reach here when we have 
multiple distinct
           // column sets. Our `RewriteDistinctAggregates` should take care 
this case.
-          sys.error("You hit a query analyzer bug. Please report your query to 
" +
-              "Spark user mailing list.")
+          throw new IllegalStateException(
+            "You hit a query analyzer bug. Please report your query to Spark 
user mailing list.")
         }
 
         // Ideally this should be done in `NormalizeFloatingNumbers`, but we 
do it here because
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 2bb3d48c145..143fb4cf960 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -539,8 +539,8 @@ case class DataSource(
         DataWritingCommand.propogateMetrics(sparkSession.sparkContext, 
resolved, metrics)
         // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring
         copy(userSpecifiedSchema = 
Some(outputColumns.toStructType.asNullable)).resolveRelation()
-      case _ =>
-        sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
+      case _ => throw new IllegalStateException(
+        s"${providingClass.getCanonicalName} does not allow create table as 
select.")
     }
   }
 
@@ -556,8 +556,8 @@ case class DataSource(
         disallowWritingIntervals(data.schema.map(_.dataType), 
forbidAnsiIntervals = false)
         DataSource.validateSchema(data.schema)
         planForWritingFileFormat(format, mode, data)
-      case _ =>
-        sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
+      case _ => throw new IllegalStateException(
+        s"${providingClass.getCanonicalName} does not allow create table as 
select.")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index e71863657dd..a4122fe0bdf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -254,8 +254,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
 
       case t: UserDefinedType[_] => makeWriter(t.sqlType)
 
-      // TODO Adds IntervalType support
-      case _ => sys.error(s"Unsupported data type $dataType.")
+      case _ => throw new IllegalStateException(s"Unsupported data type 
$dataType.")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index c033aedc778..f3eb5636bb9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -300,7 +300,7 @@ object ShuffleExchangeExec {
           override def numPartitions: Int = 1
           override def getPartition(key: Any): Int = 0
         }
-      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
+      case _ => throw new IllegalStateException(s"Exchange not implemented for 
$newPartitioning")
       // TODO: Handle BroadcastPartitioning.
     }
     def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match 
{
@@ -319,7 +319,7 @@ object ShuffleExchangeExec {
         val projection = 
UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
         row => projection(row)
       case SinglePartition => identity
-      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
+      case _ => throw new IllegalStateException(s"Exchange not implemented for 
$newPartitioning")
     }
 
     val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] &&
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index a809ea07d0e..a6a5423b1f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -284,8 +284,9 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
PredicateHelper {
       }
       // Other cases are disallowed as they are ambiguous or would require a 
cartesian
       // product.
-      
udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach {
-        udf => sys.error(s"Invalid PythonUDF $udf, requires attributes from 
more than one child.")
+      
udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach { 
udf =>
+        throw new IllegalStateException(
+          s"Invalid PythonUDF $udf, requires attributes from more than one 
child.")
       }
 
       val rewritten = plan.withNewChildren(newChildren).transformExpressions {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index dd09a38c8b3..1d377350253 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -257,7 +257,8 @@ case class MemoryStream[A : Encoder](
     val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
 
     if (offsetDiff < 0) {
-      sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
+      throw new IllegalStateException(
+        s"Offsets committed out of order: $lastOffsetCommitted followed by 
$end")
     }
 
     batches.trimStart(offsetDiff)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index 04431f3d381..580f7066e44 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -155,7 +155,8 @@ class TextSocketMicroBatchStream(host: String, port: Int, 
numPartitions: Int)
     val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
 
     if (offsetDiff < 0) {
-      sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
+      throw new IllegalStateException(
+        s"Offsets committed out of order: $lastOffsetCommitted followed by 
$end")
     }
 
     batches.trimStart(offsetDiff)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index afd0aba0068..4bbfc3467d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -79,7 +79,8 @@ case class ScalarSubquery(
   def updateResult(): Unit = {
     val rows = plan.executeCollect()
     if (rows.length > 1) {
-      sys.error(s"more than one row returned by a subquery used as an 
expression:\n$plan")
+      throw new IllegalStateException(
+        s"more than one row returned by a subquery used as an 
expression:\n$plan")
     }
     if (rows.length == 1) {
       assert(rows(0).numFields == 1,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index 1ebbd5f4064..e40373917c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -90,7 +90,7 @@ private[window] object AggregateProcessor {
         updateExpressions ++= noOps
         evaluateExpressions += imperative
       case other =>
-        sys.error(s"Unsupported aggregate function: $other")
+        throw new IllegalStateException(s"Unsupported aggregate function: 
$other")
     }
 
     // Create the projections.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
index 5f1758d12fd..31b7df1abd0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
@@ -97,7 +97,7 @@ trait WindowExecBase extends UnaryExecNode {
         RowBoundOrdering(offset)
 
       case (RowFrame, _) =>
-        sys.error(s"Unhandled bound in windows expressions: $bound")
+        throw new IllegalStateException(s"Unhandled bound in windows 
expressions: $bound")
 
       case (RangeFrame, CurrentRow) =>
         val ordering = RowOrdering.create(orderSpec, child.output)
@@ -139,7 +139,7 @@ trait WindowExecBase extends UnaryExecNode {
         RangeBoundOrdering(ordering, current, bound)
 
       case (RangeFrame, _) =>
-        sys.error("Non-Zero range offsets are not supported for windows " +
+        throw new IllegalStateException("Non-Zero range offsets are not 
supported for windows " +
           "with multiple order expressions.")
     }
   }
@@ -189,7 +189,7 @@ trait WindowExecBase extends UnaryExecNode {
               }
             case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, 
f)
             case f: PythonUDF => collect("AGGREGATE", frame, e, f)
-            case f => sys.error(s"Unsupported window function: $f")
+            case f => throw new IllegalStateException(s"Unsupported window 
function: $f")
           }
         case _ =>
       }
@@ -296,7 +296,7 @@ trait WindowExecBase extends UnaryExecNode {
             }
 
           case _ =>
-            sys.error(s"Unsupported factory: $key")
+            throw new IllegalStateException(s"Unsupported factory: $key")
         }
 
         // Keep track of the number of expressions. This is a side-effect in a 
map...
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index f49018b0c85..455735a1879 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -922,7 +922,8 @@ private[hive] trait HiveInspectors {
     case Literal(_, dt: UserDefinedType[_]) =>
       toInspector(dt.sqlType)
     // We will enumerate all of the possible constant expressions, throw 
exception if we missed
-    case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type 
[$dt].")
+    case Literal(_, dt) =>
+      throw new IllegalStateException(s"Hive doesn't support the constant type 
[$dt].")
     // ideally, we don't test the foldable here(but in optimizer), however, 
some of the
     // Hive UDF / UDAF requires its argument to be constant objectinspector, 
we do it eagerly.
     case _ if expr.foldable => toInspector(Literal.create(expr.eval(), 
expr.dataType))
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 3dddca84475..d70ac781c03 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -801,7 +801,7 @@ private[hive] class HiveClientImpl(
     val maxResults = 100000
     val results = runHive(sql, maxResults)
     // It is very confusing when you only get back some of the results...
-    if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED")
+    if (results.size == maxResults) throw new IllegalStateException("RESULTS 
POSSIBLY TRUNCATED")
     results
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to