This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f5cf11c  [SPARK-30973][SQL] ScriptTransformationExec should wait for 
the termination …
f5cf11c is described below

commit f5cf11c4d39190f7f5f8a20c8c634c0dc2d6c212
Author: sunke.03 <[email protected]>
AuthorDate: Thu May 14 13:55:24 2020 +0000

    [SPARK-30973][SQL] ScriptTransformationExec should wait for the termination 
…
    
    ### What changes were proposed in this pull request?
    
    This PR try to fix a bug in 
`org.apache.spark.sql.hive.execution.ScriptTransformationExec`. This bug 
appears in our online cluster.  `ScriptTransformationExec` should throw an 
exception, when user uses a python script which contains parse error.  But 
current implementation may miss this case of failure.
    
    ### Why are the changes needed?
    
    When user uses a python script which contains a parse error, there will be 
no output. So  `scriptOutputReader.next(scriptOutputWritable) <= 0` matches, 
then we use `checkFailureAndPropagate()` to check the `proc`.  But the `proc` 
may still be alive and `writerThread.exception` is not defined,  
`checkFailureAndPropagate` cannot check this case of failure.  In the end, the 
Spark SQL job runs successfully and returns no result. In fact, the SparK SQL 
job should fails and shows the except [...]
    
    For example, the error python script is blow.
    ``` python
    # encoding: utf8
    import unknow_module
    import sys
    
    for line in sys.stdin:
        print line
    ```
    The bug can be reproduced by running the following code in our cluter.
    ```
    spark.range(100*100).toDF("index").createOrReplaceTempView("test")
    spark.sql("select TRANSFORM(index) USING 'python error_python.py' as 
new_index from test").collect.foreach(println)
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UT
    
    Closes #27724 from slamke/transformation.
    
    Authored-by: sunke.03 <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit ddbce4edee6d4de30e6900bc0f03728a989aef0a)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  9 ++++++
 .../hive/execution/ScriptTransformationExec.scala  | 12 +++++++-
 .../hive/execution/ScriptTransformationSuite.scala | 36 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6c18280..31038a0e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2561,6 +2561,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
+    buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
+      .internal()
+      .doc("Timeout for executor to wait for the termination of transformation 
script when EOF.")
+      .version("3.0.0")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ > 0, "The timeout value must be positive")
+      .createWithDefault(10L)
+
   /**
    * Holds information about keys that have been deprecated.
    *
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 40f7b4e..c7183fd 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
 import java.io._
 import java.nio.charset.StandardCharsets
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 import javax.annotation.Nullable
 
 import scala.collection.JavaConverters._
@@ -42,6 +43,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.HiveInspectors
 import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.{CircularBuffer, RedirectThread, 
SerializableConfiguration, Utils}
 
@@ -136,6 +138,15 @@ case class ScriptTransformationExec(
             throw writerThread.exception.get
           }
 
+          // There can be a lag between reader read EOF and the process 
termination.
+          // If the script fails to startup, this kind of error may be missed.
+          // So explicitly waiting for the process termination.
+          val timeout = 
conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT)
+          val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS)
+          if (!exitRes) {
+            log.warn(s"Transformation script process exits timeout in $timeout 
seconds")
+          }
+
           if (!proc.isAlive) {
             val exitCode = proc.exitValue()
             if (exitCode != 0) {
@@ -173,7 +184,6 @@ case class ScriptTransformationExec(
                     // Ideally the proc should *not* be alive at this point but
                     // there can be a lag between EOF being written out and 
the process
                     // being terminated. So explicitly waiting for the process 
to be done.
-                    proc.waitFor()
                     checkFailureAndPropagate()
                     return false
                 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 7153d3f..b97eb86 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -227,6 +227,42 @@ class ScriptTransformationSuite extends SparkPlanTest with 
SQLTestUtils with Tes
         'e.cast("string")).collect())
     }
   }
+
+  test("SPARK-30973: TRANSFORM should wait for the termination of the script 
(no serde)") {
+    assume(TestUtils.testCommandAvailable("/bin/bash"))
+
+    val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+    val e = intercept[SparkException] {
+      val plan =
+        new ScriptTransformationExec(
+          input = Seq(rowsDf.col("a").expr),
+          script = "some_non_existent_command",
+          output = Seq(AttributeReference("a", StringType)()),
+          child = rowsDf.queryExecution.sparkPlan,
+          ioschema = noSerdeIOSchema)
+      SparkPlanTest.executePlan(plan, hiveContext)
+    }
+    assert(e.getMessage.contains("Subprocess exited with status"))
+    assert(uncaughtExceptionHandler.exception.isEmpty)
+  }
+
+  test("SPARK-30973: TRANSFORM should wait for the termination of the script 
(with serde)") {
+    assume(TestUtils.testCommandAvailable("/bin/bash"))
+
+    val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+    val e = intercept[SparkException] {
+      val plan =
+        new ScriptTransformationExec(
+          input = Seq(rowsDf.col("a").expr),
+          script = "some_non_existent_command",
+          output = Seq(AttributeReference("a", StringType)()),
+          child = rowsDf.queryExecution.sparkPlan,
+          ioschema = serdeIOSchema)
+      SparkPlanTest.executePlan(plan, hiveContext)
+    }
+    assert(e.getMessage.contains("Subprocess exited with status"))
+    assert(uncaughtExceptionHandler.exception.isEmpty)
+  }
 }
 
 private case class ExceptionInjectingOperator(child: SparkPlan) extends 
UnaryExecNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to