Repository: spark
Updated Branches:
  refs/heads/master 01125a116 -> d4c7572db


Move ScriptTransformation into the appropriate place.

Author: Reynold Xin <[email protected]>

Closes #1162 from rxin/script and squashes the following commits:

2c836b9 [Reynold Xin] Move ScriptTransformation into the appropriate place.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4c7572d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4c7572d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4c7572d

Branch: refs/heads/master
Commit: d4c7572dba1be49e55ceb38713652e5bcf485be8
Parents: 01125a1
Author: Reynold Xin <[email protected]>
Authored: Fri Jun 20 17:16:56 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Jun 20 17:16:56 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/ScriptTransformation.scala   | 80 --------------------
 .../hive/execution/ScriptTransformation.scala   | 80 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4c7572d/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
deleted file mode 100644
index 8258ee5..0000000
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import java.io.{BufferedReader, InputStreamReader}
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.hive.HiveContext
-
-/* Implicit conversions */
-import scala.collection.JavaConversions._
-
-/**
- * :: DeveloperApi ::
- * Transforms the input by forking and running the specified script.
- *
- * @param input the set of expression that should be passed to the script.
- * @param script the command that should be executed.
- * @param output the attributes that are produced by the script.
- */
-@DeveloperApi
-case class ScriptTransformation(
-    input: Seq[Expression],
-    script: String,
-    output: Seq[Attribute],
-    child: SparkPlan)(@transient sc: HiveContext)
-  extends UnaryNode {
-
-  override def otherCopyArgs = sc :: Nil
-
-  def execute() = {
-    child.execute().mapPartitions { iter =>
-      val cmd = List("/bin/bash", "-c", script)
-      val builder = new ProcessBuilder(cmd)
-      val proc = builder.start()
-      val inputStream = proc.getInputStream
-      val outputStream = proc.getOutputStream
-      val reader = new BufferedReader(new InputStreamReader(inputStream))
-
-      // TODO: This should be exposed as an iterator instead of reading in all 
the data at once.
-      val outputLines = collection.mutable.ArrayBuffer[Row]()
-      val readerThread = new Thread("Transform OutputReader") {
-        override def run() {
-          var curLine = reader.readLine()
-          while (curLine != null) {
-            // TODO: Use SerDe
-            outputLines += new 
GenericRow(curLine.split("\t").asInstanceOf[Array[Any]])
-            curLine = reader.readLine()
-          }
-        }
-      }
-      readerThread.start()
-      val outputProjection = new Projection(input)
-      iter
-        .map(outputProjection)
-        // TODO: Use SerDe
-        .map(_.mkString("", "\t", 
"\n").getBytes("utf-8")).foreach(outputStream.write)
-      outputStream.close()
-      readerThread.join()
-      outputLines.toIterator
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d4c7572d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
new file mode 100644
index 0000000..8258ee5
--- /dev/null
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.{BufferedReader, InputStreamReader}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.hive.HiveContext
+
+/* Implicit conversions */
+import scala.collection.JavaConversions._
+
+/**
+ * :: DeveloperApi ::
+ * Transforms the input by forking and running the specified script.
+ *
+ * @param input the set of expression that should be passed to the script.
+ * @param script the command that should be executed.
+ * @param output the attributes that are produced by the script.
+ */
+@DeveloperApi
+case class ScriptTransformation(
+    input: Seq[Expression],
+    script: String,
+    output: Seq[Attribute],
+    child: SparkPlan)(@transient sc: HiveContext)
+  extends UnaryNode {
+
+  override def otherCopyArgs = sc :: Nil
+
+  def execute() = {
+    child.execute().mapPartitions { iter =>
+      val cmd = List("/bin/bash", "-c", script)
+      val builder = new ProcessBuilder(cmd)
+      val proc = builder.start()
+      val inputStream = proc.getInputStream
+      val outputStream = proc.getOutputStream
+      val reader = new BufferedReader(new InputStreamReader(inputStream))
+
+      // TODO: This should be exposed as an iterator instead of reading in all 
the data at once.
+      val outputLines = collection.mutable.ArrayBuffer[Row]()
+      val readerThread = new Thread("Transform OutputReader") {
+        override def run() {
+          var curLine = reader.readLine()
+          while (curLine != null) {
+            // TODO: Use SerDe
+            outputLines += new 
GenericRow(curLine.split("\t").asInstanceOf[Array[Any]])
+            curLine = reader.readLine()
+          }
+        }
+      }
+      readerThread.start()
+      val outputProjection = new Projection(input)
+      iter
+        .map(outputProjection)
+        // TODO: Use SerDe
+        .map(_.mkString("", "\t", 
"\n").getBytes("utf-8")).foreach(outputStream.write)
+      outputStream.close()
+      readerThread.join()
+      outputLines.toIterator
+    }
+  }
+}

Reply via email to