[
https://issues.apache.org/jira/browse/SPARK-16613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15383836#comment-15383836
]
Sean Owen commented on SPARK-16613:
-----------------------------------
Interesting, I think an issue here is that the semantics of RDD.pipe() are
unclear. Input and output are RDD[String], and it seems like each element of
the output must be stdout of running the command on a single input element.
In reality, the process is run once per partition, and each input element is
sent to the stdin, separated by newlines. The lines of the output are parsed as
the output of the partition -- which means that a process which outputs many
lines of output results in many elements of the RDD.
Right now, the process is still invoked for an empty partition, and for this
script, correctly results in "0".
The docs do say "pipes elements to an external process" which sort of implies
the current behavior.
I think we should, in any event, clarify docs and probably also modify the
behavior to output nothing for an empty partition -- not even the result of the
process when presented with no input.
I'm reluctant to change the semantics of the method to run one process per
input, even if that strikes me as more logical. This means we're kind of stuck
with this problem that it's not necessarily possible to match outputs 1:1 with
inputs, but that's just a constraint on the type of command you can use with
this I guess.
CC [~andrewor14] if available, but moreso [~tejasp]
> A bug in RDD pipe operation
> ---------------------------
>
> Key: SPARK-16613
> URL: https://issues.apache.org/jira/browse/SPARK-16613
> Project: Spark
> Issue Type: Bug
> Reporter: Alex Krasnyansky
>
> Suppose we have such Spark code
> {code}
> object PipeExample {
> def main(args: Array[String]) {
> val fstRdd = sc.parallelize(List("hi", "hello", "how", "are", "you"))
> val pipeRdd =
> fstRdd.pipe("/Users/finkel/spark-pipe-example/src/main/resources/len.sh")
> pipeRdd.collect.foreach(println)
> }
> }
> {code}
> It uses a bash script to convert a string to its length.
> {code}
> #!/bin/sh
> read input
> len=${#input}
> echo $len
> {code}
> So far so good, but when I run the code, it prints incorrect output. For
> example:
> {code}
> 0
> 2
> 0
> 5
> 3
> 0
> 3
> 3
> {code}
> I expect to see
> {code}
> 2
> 5
> 3
> 3
> 3
> {code}
> which is correct output for the app. I think it's a bug. It's expected to see
> only positive integers and avoid zeros.
> Environment:
> 1. Spark version is 1.6.2
> 2. Scala version is 2.11.6
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]