[
https://issues.apache.org/jira/browse/SPARK-20803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177855#comment-16177855
]
Alessio Placitelli commented on SPARK-20803:
--------------------------------------------
I can still reproduce this issue with Spark 2.2.0, but was able to create a
smaller test case that makes it easy to reproduce 100% of the times :
{code}
# import numpy as np
kd = KernelDensity()
# kd.setSample(sc.parallelize([0.0, 1.0, 2.0, 3.0])) # THIS WORKS
kd.setSample(sc.parallelize([0.0, np.float32(1.0), 2.0, 3.0])) # THIS FAILS
kd.setBandwidth(0.35)
kd.estimate([0.0, 1.0])
{code}
If the source RDD contains any output from numpy or scipy, _KernelDensity_
fails.
> KernelDensity.estimate in pyspark.mllib.stat.KernelDensity throws
> net.razorvine.pickle.PickleException when input data is normally distributed
> (no error when data is not normally distributed)
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-20803
> URL: https://issues.apache.org/jira/browse/SPARK-20803
> Project: Spark
> Issue Type: Bug
> Components: MLlib, PySpark
> Affects Versions: 2.1.1
> Environment: Linux version 4.4.14-smp
> x86/fpu: Legacy x87 FPU detected.
> using command line:
> bash-4.3$ ./bin/spark-submit ~/work/python/Features.py
> bash-4.3$ pwd
> /home/bsrsharma/spark-2.1.1-bin-hadoop2.7
> export JAVA_HOME=/home/bsrsharma/jdk1.8.0_121
> Reporter: Bettadapura Srinath Sharma
>
> When data is NOT normally distributed (correct behavior):
> This code:
> vecRDD = sc.parallelize(colVec)
> kd = KernelDensity()
> kd.setSample(vecRDD)
> kd.setBandwidth(3.0)
> # Find density estimates for the given values
> densities = kd.estimate(samplePoints)
> produces:
> 17/05/18 15:40:36 INFO SparkContext: Starting job: aggregate at
> KernelDensity.scala:92
> 17/05/18 15:40:36 INFO DAGScheduler: Got job 21 (aggregate at
> KernelDensity.scala:92) with 1 output partitions
> 17/05/18 15:40:36 INFO DAGScheduler: Final stage: ResultStage 24 (aggregate
> at KernelDensity.scala:92)
> 17/05/18 15:40:36 INFO DAGScheduler: Parents of final stage: List()
> 17/05/18 15:40:36 INFO DAGScheduler: Missing parents: List()
> 17/05/18 15:40:36 INFO DAGScheduler: Submitting ResultStage 24
> (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345), which
> has no missing parents
> 17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25 stored as values in
> memory (estimated size 6.6 KB, free 413.6 MB)
> 17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes
> in memory (estimated size 3.6 KB, free 413.6 MB)
> 17/05/18 15:40:36 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory
> on 192.168.0.115:38645 (size: 3.6 KB, free: 413.9 MB)
> 17/05/18 15:40:36 INFO SparkContext: Created broadcast 25 from broadcast at
> DAGScheduler.scala:996
> 17/05/18 15:40:36 INFO DAGScheduler: Submitting 1 missing tasks from
> ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at
> PythonMLLibAPI.scala:1345)
> 17/05/18 15:40:36 INFO TaskSchedulerImpl: Adding task set 24.0 with 1 tasks
> 17/05/18 15:40:36 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID
> 24, localhost, executor driver, partition 0, PROCESS_LOCAL, 96186 bytes)
> 17/05/18 15:40:36 INFO Executor: Running task 0.0 in stage 24.0 (TID 24)
> 17/05/18 15:40:37 INFO PythonRunner: Times: total = 66, boot = -1831, init =
> 1844, finish = 53
> 17/05/18 15:40:37 INFO Executor: Finished task 0.0 in stage 24.0 (TID 24).
> 2476 bytes result sent to driver
> 17/05/18 15:40:37 INFO DAGScheduler: ResultStage 24 (aggregate at
> KernelDensity.scala:92) finished in 1.001 s
> 17/05/18 15:40:37 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID
> 24) in 1004 ms on localhost (executor driver) (1/1)
> 17/05/18 15:40:37 INFO TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks
> have all completed, from pool
> 17/05/18 15:40:37 INFO DAGScheduler: Job 21 finished: aggregate at
> KernelDensity.scala:92, took 1.136263 s
> 17/05/18 15:40:37 INFO BlockManagerInfo: Removed broadcast_25_piece0 on
> 192.168.0.115:38645 in memory (size: 3.6 KB, free: 413.9 MB)
> 5.6654703477e-05,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001
> ,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> But if Data IS normally distributed:
> I see:
> 17/05/18 15:50:16 ERROR Executor: Exception in task 0.0 in stage 24.0 (TID 24)
> net.razorvine.pickle.PickleException: expected zero arguments for
> construction of ClassDict (for numpy.dtype)
> On Scala, the correct result is:
> Code:
> vecRDD = sc.parallelize(colVec)
> kd = new KernelDensity().setSample(vecRDD).setBandwidth(3.0)
>
> // Find density estimates for the given values
> densities = kd.estimate(samplePoints)
> [0.04113814235801906,1.0994865517293571E-163,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,]
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]