Hi Charles, I tried this with dummied out functions which just sum transformations of a list of integers, maybe they could be replaced by algorithms in your case. The idea is to call them through a "god" function that takes an additional type parameter and delegates out to the appropriate function. Here's my code, maybe it helps...
def f0(xs): > return len(xs) > def f1(xs): > return sum(xs) > def f2(xs): > return sum([x**2 for x in xs]) > def f_god(n, xs): > if n == 1: > return f1(xs) > elif n == 2: > return f2(xs) > else: > return f0(xs) > > xs = [x for x in range(0, 5)] > xs_b = sc.broadcast(xs) > ns = sc.parallelize([x for x in range(0, 3)]) > results = ns.map(lambda n: f_god(n, xs_b.value)) > print results.take(10) gives me: [5, 10, 30] -sujit On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau <hol...@pigscanfly.ca> wrote: > You probably want to look at the map transformation, and the many more > defined on RDDs. The function you pass in to map is serialized and the > computation is distributed. > > > On Monday, March 28, 2016, charles li <charles.up...@gmail.com> wrote: > >> >> use case: have a dataset, and want to use different algorithms on that, >> and fetch the result. >> >> for making this, I think I should distribute my algorithms, and run these >> algorithms on the dataset at the same time, am I right? >> >> but it seems that spark can not parallelize/serialize >> algorithms/functions, then how to make it? >> >> >> *here is the test code*: >> >> >> ------------------------------------------------------------------------------------------------ >> def test(): >> pass >> function_list = [test] * 10 >> >> sc.parallelize([test] * 10).take(1) >> >> ------------------------------------------------------------------------------------------------ >> >> >> *error message: * >> Py4JJavaError: An error occurred while calling >> z:org.apache.spark.api.python.PythonRDD.runJob. >> >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage >> 9.0 (TID 105, sh-demo-hadoop-07): >> org.apache.spark.api.python.PythonException: Traceback (most recent call >> last): >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", >> line 111, in main >> >> process() >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", >> line 106, in process >> >> serializer.dump_stream(func(split_index, iterator), outfile) >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 263, in dump_stream >> >> vs = list(itertools.islice(iterator, batch)) >> >> File >> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line >> 1293, in takeUpToNumLeft >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 139, in load_stream >> >> yield self._read_with_length(stream) >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 164, in _read_with_length >> >> return self.loads(obj) >> >> File >> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", >> line 422, in loads >> >> return pickle.loads(obj) >> >> AttributeError: 'module' object has no attribute 'test' >> >> >> at >> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) >> >> at >> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) >> >> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) >> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> what's interesting is that* when I run sc.parallelize([test] * >> 10).collect() , it works fine*, returns : >> >> [<function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>, >> >> <function __main__.test>] >> >> >> >> >> -- >> -------------------------------------- >> a spark lover, a quant, a developer and a good man. >> >> http://github.com/litaotao >> > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > >