Hello, everyone!

I'm trying to implement the association rules in Python. I got implement an
association by a frequent element, works as expected (example can be seen
here
<https://github.com/mrcaique/spark/blob/master/examples/src/main/python/mllib/fpgrowth_example.py#L36-L40>).


Now, my challenge is to implement by a custom RDD. I study the structure of
Spark and how it implement Python functions of machine learning algorithms.
The implementations can be seen in the fork
<https://github.com/mrcaique/spark>.

The example for a custom RDD for association rule can be seen here
<https://github.com/mrcaique/spark/blob/master/examples/src/main/python/mllib/association_rules_example.py>,
in the line 33 the output is:

MapPartitionsRDD[10] at mapPartitions at PythonMLLibAPI.scala:1533

It is ok. Testing the Scala example, the structure returned is a
MapPartitions. But, when I try use a *foreach* in this collection:

net.razorvine.pickle.PickleException: expected zero arguments for
construction of ClassDict (for numpy.core.multiarray._reconstruct)
    at
net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
    at
org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1547)
    at
org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1546)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
    at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:77)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:45)
    at org.apache.spark.scheduler.Task.run(Task.scala:81)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

What is this? What does mean? Any help or tip is welcome.

Thanks,
Caique.

Reply via email to