Hi Charles,

I tried this with dummied out functions which just sum transformations of a
list of integers, maybe they could be replaced by algorithms in your case.
The idea is to call them through a "god" function that takes an additional
type parameter and delegates out to the appropriate function. Here's my
code, maybe it helps...

def f0(xs):
>   return len(xs)
> def f1(xs):
>   return sum(xs)
> def f2(xs):
>   return sum([x**2 for x in xs])
> def f_god(n, xs):
>   if n == 1:
>     return f1(xs)
>   elif n == 2:
>     return f2(xs)
>   else:
>     return f0(xs)
>
> xs = [x for x in range(0, 5)]
> xs_b = sc.broadcast(xs)
> ns = sc.parallelize([x for x in range(0, 3)])
> results = ns.map(lambda n: f_god(n, xs_b.value))
> print results.take(10)


gives me:

[5, 10, 30]
-sujit


On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau <hol...@pigscanfly.ca> wrote:

> You probably want to look at the map transformation, and the many more
> defined on RDDs. The function you pass in to map is serialized and the
> computation is distributed.
>
>
> On Monday, March 28, 2016, charles li <charles.up...@gmail.com> wrote:
>
>>
>> use case: have a dataset, and want to use different algorithms on that,
>> and fetch the result.
>>
>> for making this, I think I should distribute my algorithms, and run these
>> algorithms on the dataset at the same time, am I right?
>>
>> but it seems that spark can not parallelize/serialize
>> algorithms/functions, then how to make it?
>>
>>
>> *here is the test code*:
>>
>>
>> ------------------------------------------------------------------------------------------------
>> def test():
>>     pass
>> function_list = [test] * 10
>>
>> sc.parallelize([test] * 10).take(1)
>>
>> ------------------------------------------------------------------------------------------------
>>
>>
>> *error message: *
>> Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>>
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>> 9.0 (TID 105, sh-demo-hadoop-07):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 111, in main
>>
>>     process()
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>> line 106, in process
>>
>>     serializer.dump_stream(func(split_index, iterator), outfile)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 263, in dump_stream
>>
>>     vs = list(itertools.islice(iterator, batch))
>>
>>   File
>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
>> 1293, in takeUpToNumLeft
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 139, in load_stream
>>
>>     yield self._read_with_length(stream)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 164, in _read_with_length
>>
>>     return self.loads(obj)
>>
>>   File
>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 422, in loads
>>
>>     return pickle.loads(obj)
>>
>> AttributeError: 'module' object has no attribute 'test'
>>
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>
>> at
>> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
>>
>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>
>> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> what's interesting is that* when I run sc.parallelize([test] *
>> 10).collect() , it works fine*, returns :
>>
>> [<function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>,
>>
>>  <function __main__.test>]
>>
>>
>>
>>
>> --
>> --------------------------------------
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>

Reply via email to