Repository: spark
Updated Branches:
  refs/heads/master b9d177c51 -> 2dd7f9308


[SPARK-7862] [SQL] Fix the deadlock in script transformation for stderr

[Related PR SPARK-7044] (https://github.com/apache/spark/pull/5671)

Author: zhichao.li <[email protected]>

Closes #6404 from zhichao-li/transform and squashes the following commits:

8418c97 [zhichao.li] add comments and remove useless failAfter logic
d9677e1 [zhichao.li] redirect the error desitination to be the same as the 
current process


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dd7f930
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dd7f930
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dd7f930

Branch: refs/heads/master
Commit: 2dd7f93080ee882afcc2aac1a419802a19a668ce
Parents: b9d177c
Author: zhichao.li <[email protected]>
Authored: Thu Jun 11 22:28:28 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Thu Jun 11 22:28:28 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/execution/ScriptTransformation.scala |  7 +++++++
 .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 12 ++++++++++--
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2dd7f930/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index fd62337..28792db 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import java.io.{BufferedReader, DataInputStream, DataOutputStream, 
EOFException, InputStreamReader}
+import java.lang.ProcessBuilder.Redirect
 import java.util.Properties
 
 import scala.collection.JavaConversions._
@@ -58,6 +59,12 @@ case class ScriptTransformation(
     child.execute().mapPartitions { iter =>
       val cmd = List("/bin/bash", "-c", script)
       val builder = new ProcessBuilder(cmd)
+      // redirectError(Redirect.INHERIT) would consume the error output from 
buffer and
+      // then print it to stderr (inherit the target from the current Scala 
process).
+      // If without this there would be 2 issues:
+      // 1) The error msg generated by the script process would be hidden.
+      // 2) If the error msg is too big to chock up the buffer, the input 
logic would be hung
+      builder.redirectError(Redirect.INHERIT)
       val proc = builder.start()
       val inputStream = proc.getInputStream
       val outputStream = proc.getOutputStream

http://git-wip-us.apache.org/repos/asf/spark/blob/2dd7f930/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8bd4900..c8e5e24 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -645,12 +645,20 @@ class SQLQuerySuite extends QueryTest {
       .queryExecution.analyzed
   }
 
-  test("test script transform") {
+  test("test script transform for stdout") {
     val data = (1 to 100000).map { i => (i, i, i) }
     data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
     assert(100000 ===
       sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM 
script_trans")
-      .queryExecution.toRdd.count())
+        .queryExecution.toRdd.count())
+  }
+
+  test("test script transform for stderr") {
+    val data = (1 to 100000).map { i => (i, i, i) }
+    data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
+    assert(0 ===
+      sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM 
script_trans")
+        .queryExecution.toRdd.count())
   }
 
   test("window function: udaf with aggregate expressin") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to