Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-24 Thread Davies Liu
Using 0 for spark.mesos.mesosExecutor.cores is better than dynamic
allocation, but have to pay a little more overhead for launching a
task, which should be OK if the task is not trivial.

Since the direct result (up to 1M by default) will also go through
mesos, it's better to tune it lower, otherwise mesos could become the
bottleneck.

spark.task.maxDirectResultSize

On Mon, Dec 19, 2016 at 3:23 PM, Chawla,Sumit  wrote:
> Tim,
>
> We will try to run the application in coarse grain mode, and share the
> findings with you.
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 19, 2016 at 3:11 PM, Timothy Chen  wrote:
>
>> Dynamic allocation works with Coarse grain mode only, we wasn't aware
>> a need for Fine grain mode after we enabled dynamic allocation support
>> on the coarse grain mode.
>>
>> What's the reason you're running fine grain mode instead of coarse
>> grain + dynamic allocation?
>>
>> Tim
>>
>> On Mon, Dec 19, 2016 at 2:45 PM, Mehdi Meziane
>>  wrote:
>> > We will be interested by the results if you give a try to Dynamic
>> allocation
>> > with mesos !
>> >
>> >
>> > - Mail Original -
>> > De: "Michael Gummelt" 
>> > À: "Sumit Chawla" 
>> > Cc: u...@mesos.apache.org, d...@mesos.apache.org, "User"
>> > , d...@spark.apache.org
>> > Envoyé: Lundi 19 Décembre 2016 22h42:55 GMT +01:00 Amsterdam / Berlin /
>> > Berne / Rome / Stockholm / Vienne
>> > Objet: Re: Mesos Spark Fine Grained Execution - CPU count
>> >
>> >
>> >> Is this problem of idle executors sticking around solved in Dynamic
>> >> Resource Allocation?  Is there some timeout after which Idle executors
>> can
>> >> just shutdown and cleanup its resources.
>> >
>> > Yes, that's exactly what dynamic allocation does.  But again I have no
>> idea
>> > what the state of dynamic allocation + mesos is.
>> >
>> > On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit 
>> > wrote:
>> >>
>> >> Great.  Makes much better sense now.  What will be reason to have
>> >> spark.mesos.mesosExecutor.cores more than 1, as this number doesn't
>> include
>> >> the number of cores for tasks.
>> >>
>> >> So in my case it seems like 30 CPUs are allocated to executors.  And
>> there
>> >> are 48 tasks so 48 + 30 =  78 CPUs.  And i am noticing this gap of 30 is
>> >> maintained till the last task exits.  This explains the gap.   Thanks
>> >> everyone.  I am still not sure how this number 30 is calculated.  ( Is
>> it
>> >> dynamic based on current resources, or is it some configuration.  I
>> have 32
>> >> nodes in my cluster).
>> >>
>> >> Is this problem of idle executors sticking around solved in Dynamic
>> >> Resource Allocation?  Is there some timeout after which Idle executors
>> can
>> >> just shutdown and cleanup its resources.
>> >>
>> >>
>> >> Regards
>> >> Sumit Chawla
>> >>
>> >>
>> >> On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt <
>> mgumm...@mesosphere.io>
>> >> wrote:
>> >>>
>> >>> >  I should preassume that No of executors should be less than number
>> of
>> >>> > tasks.
>> >>>
>> >>> No.  Each executor runs 0 or more tasks.
>> >>>
>> >>> Each executor consumes 1 CPU, and each task running on that executor
>> >>> consumes another CPU.  You can customize this via
>> >>> spark.mesos.mesosExecutor.cores
>> >>> (https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md)
>> and
>> >>> spark.task.cpus
>> >>> (https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md)
>> >>>
>> >>> On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit > >
>> >>> wrote:
>> 
>>  Ah thanks. looks like i skipped reading this "Neither will executors
>>  terminate when they’re idle."
>> 
>>  So in my job scenario,  I should preassume that No of executors should
>>  be less than number of tasks. Ideally one executor should execute 1
>> or more
>>  tasks.  But i am observing something strange instead.  I start my job
>> with
>>  48 partitions for a spark job. In mesos ui i see that number of tasks
>> is 48,
>>  but no. of CPUs is 78 which is way more than 48.  Here i am assuming
>> that 1
>>  CPU is 1 executor.   I am not specifying any configuration to set
>> number of
>>  cores per executor.
>> 
>>  Regards
>>  Sumit Chawla
>> 
>> 
>>  On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere
>>   wrote:
>> >
>> > That makes sense. From the documentation it looks like the executors
>> > are not supposed to terminate:
>> >
>> > http://spark.apache.org/docs/latest/running-on-mesos.html#
>> fine-grained-deprecated
>> >>
>> >> Note that while Spark tasks in fine-grained will relinquish cores as
>> >> they terminate, they will not relinquish memory, as the JVM does
>> not give
>> >> memory back to the Operating System. Neither will executors
>> terminate when
>> 

Re: UDF with column value comparison fails with PySpark

2016-11-10 Thread Davies Liu
On Thu, Nov 10, 2016 at 11:14 AM, Perttu Ranta-aho  wrote:
> Hello,
>
> I want to create an UDF which modifies one column value depending on value
> of some other column. But Python version of the code fails always in column
> value comparison. Below are simple examples, scala version works as expected
> but Python version throws an execption. Am I missing something obvious? As
> can be seen from PySpark exception I'm using Spark 2.0.1.
>
> -Perttu
>
> import org.apache.spark.sql.functions.udf
> val df = spark.createDataFrame(List(("a",1), ("b",2), ("c",
> 3))).withColumnRenamed("_1", "name").withColumnRenamed("_2", "value")
> def myUdf = udf((name: String, value: Int) => {if (name == "c") { value * 2
> } else { value }})
> df.withColumn("udf", myUdf(df("name"), df("value"))).show()
> ++-+---+
> |name|value|udf|
> ++-+---+
> |   a|1|  1|
> |   b|2|  2|
> |   c|3|  6|
> ++-+---+
>
>
> from pyspark.sql.types import StringType, IntegerType
> import pyspark.sql.functions as F
>
> df = sqlContext.createDataFrame((('a',1), ('b',2), ('c', 3)),
> ('name','value'))
>
> def my_udf(name, value):
> if name == 'c':
> return value * 2
> return value
> F.udf(my_udf, IntegerType())

udf = F.udf(my_udf, IntegerType())
df.withColumn("udf", udf(df.name, df.value)).show()

>
> df.withColumn("udf", my_udf(df.name, df.value)).show()
>
> ---
> ValueErrorTraceback (most recent call last)
>  in ()
> > 1 df.withColumn("udf", my_udf(df.name, df.value)).show()
>
>  in my_udf(name, value)
>   3
>   4 def my_udf(name, value):
> > 5 if name == 'c':
>   6 return value * 2
>   7 return value
>
> /home/ec2-user/spark-2.0.1-bin-hadoop2.4/python/pyspark/sql/column.pyc in
> __nonzero__(self)
> 425
> 426 def __nonzero__(self):
> --> 427 raise ValueError("Cannot convert column into bool: please
> use '&' for 'and', '|' for 'or', "
> 428  "'~' for 'not' when building DataFrame
> boolean expressions.")
> 429 __bool__ = __nonzero__
>
> ValueError: Cannot convert column into bool: please use '&' for 'and', '|'
> for 'or', '~' for 'not' when building DataFrame boolean expressions.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: TaskMemoryManager: Failed to allocate a page

2016-10-27 Thread Davies Liu
Usually using broadcast join could boost the performance when you have
enough memory,
You should decrease it or even disable it when there is no enough memory.

On Thu, Oct 27, 2016 at 1:22 PM, Pietro Pugni <pietro.pu...@gmail.com> wrote:
> Thank you Davies,
> this worked! But what are the consequences of setting 
> spark.sql.autoBroadcastJoinThreshold=0?
> Will it degrade or boost performance?
> Thank you again
>  Pietro
>
>> Il giorno 27 ott 2016, alle ore 18:54, Davies Liu <dav...@databricks.com> ha 
>> scritto:
>>
>> I think this is caused by BroadcastHashJoin try to use more memory
>> than the amount driver have, could you decrease the
>> spark.sql.autoBroadcastJoinThreshold  (-1 or 0  means disable it)?
>>
>> On Thu, Oct 27, 2016 at 9:19 AM, Pietro Pugni <pietro.pu...@gmail.com> wrote:
>>> I’m sorry, here’s the formatted message text:
>>>
>>>
>>>
>>> I'm running an ETL process that joins table1 with other tables (CSV files),
>>> one table at time (for example table1 with table2, table1 with table3, and
>>> so on). The join is written inside a PostgreSQL istance using JDBC.
>>>
>>> The entire process runs successfully if I use table2, table3 and table4. If
>>> I add table5, table6, table7, the process run successfully with table5,
>>> table6 and table7 but as soon as it reaches table2 it starts displaying a
>>> lot of messagges like this:
>>>
>>> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
>>> (33554432 bytes), try again.
>>> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
>>> (33554432 bytes), try again.
>>> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
>>> (33554432 bytes), try again.
>>> ...
>>> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
>>> (33554432 bytes), try again.
>>> ...
>>> Traceback (most recent call last):
>>>  File "/Volumes/Data/www/beaver/tmp/ETL_Spark/etl.py", line 1200, in
>>> 
>>>
>>>sparkdf2database(flusso['sparkdf'], schema + "." + postgresql_tabella,
>>> "append")
>>>  File "/Volumes/Data/www/beaver/tmp/ETL_Spark/etl.py", line 144, in
>>> sparkdf2database
>>>properties={"ApplicationName":info["nome"] + " - Scrittura della tabella
>>> " + dest, "disableColumnSanitiser":"true", "reWriteBatchedInserts":"true"}
>>>  File
>>> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
>>> line 762, in jdbc
>>>  File
>>> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py",
>>> line 1133, in __call__
>>>  File
>>> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 63, in deco
>>>  File
>>> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py",
>>> line 319, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o301.jdbc.
>>> : org.apache.spark.SparkException: Exception thrown in awaitResult:
>>>at
>>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>>>at
>>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>>>at
>>> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>>>at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>>>at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>>>at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>>>at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>at
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>>>at
>>> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
>>>at
>>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>>>at
>>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenSemi

Re: Using Hive UDTF in SparkSQL

2016-10-27 Thread Davies Liu
Could you file a JIRA for this bug?

On Thu, Oct 27, 2016 at 3:05 AM, Lokesh Yadav
 wrote:
> Hello
>
> I am trying to use a Hive UDTF function in spark SQL. But somehow its not
> working for me as intended and I am not able to understand the behavior.
>
> When I try to register a function like this:
> create temporary function SampleUDTF_01 as
> 'com.fl.experiments.sparkHive.SampleUDTF' using JAR
> 'hdfs:///user/root/sparkHive-1.0.0.jar';
> It successfully registers the function, but gives me a 'not a registered
> function' error when I try to run that function. Also it doesn't show up in
> the list when I do a 'show functions'.
>
> Another case:
> When I try to register the same function as a temporary function using a
> local jar (the hdfs path doesn't work with temporary function, that is weird
> too), it registers, and I am able to successfully run that function as well.
> Another weird thing is that I am not able to drop that function using the
> 'drop function ...' statement. This the functions shows up in the function
> registry.
>
> I am stuck with this, any help would be really appreciated.
> Thanks
>
> Regards,
> Lokesh Yadav

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: TaskMemoryManager: Failed to allocate a page

2016-10-27 Thread Davies Liu
I think this is caused by BroadcastHashJoin try to use more memory
than the amount driver have, could you decrease the
spark.sql.autoBroadcastJoinThreshold  (-1 or 0  means disable it)?

On Thu, Oct 27, 2016 at 9:19 AM, Pietro Pugni  wrote:
> I’m sorry, here’s the formatted message text:
>
>
>
> I'm running an ETL process that joins table1 with other tables (CSV files),
> one table at time (for example table1 with table2, table1 with table3, and
> so on). The join is written inside a PostgreSQL istance using JDBC.
>
> The entire process runs successfully if I use table2, table3 and table4. If
> I add table5, table6, table7, the process run successfully with table5,
> table6 and table7 but as soon as it reaches table2 it starts displaying a
> lot of messagges like this:
>
> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
> (33554432 bytes), try again.
> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
> (33554432 bytes), try again.
> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
> (33554432 bytes), try again.
> ...
> 16/10/27 17:33:47 WARN TaskMemoryManager: Failed to allocate a page
> (33554432 bytes), try again.
> ...
> Traceback (most recent call last):
>   File "/Volumes/Data/www/beaver/tmp/ETL_Spark/etl.py", line 1200, in
> 
>
> sparkdf2database(flusso['sparkdf'], schema + "." + postgresql_tabella,
> "append")
>   File "/Volumes/Data/www/beaver/tmp/ETL_Spark/etl.py", line 144, in
> sparkdf2database
> properties={"ApplicationName":info["nome"] + " - Scrittura della tabella
> " + dest, "disableColumnSanitiser":"true", "reWriteBatchedInserts":"true"}
>   File
> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
> line 762, in jdbc
>   File
> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py",
> line 1133, in __call__
>   File
> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco
>   File
> "/Volumes/Data/www/beaver/tmp/ETL_Spark/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py",
> line 319, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o301.jdbc.
> : org.apache.spark.SparkException: Exception thrown in awaitResult:
> at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
> at
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenSemi(BroadcastHashJoinExec.scala:318)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:84)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
> at
> org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
> at
> org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
> at
> org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
> at
> org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:217)
> at
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
> at
> org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
> at
> 

Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.

2016-09-06 Thread Davies Liu
I think the slowness is caused by generated aggregate method has more
than 8K bytecodes, than it's not JIT compiled, became much slower.

Could you try to disable the DontCompileHugeMethods by:

-XX:-DontCompileHugeMethods

On Mon, Sep 5, 2016 at 4:21 AM, Сергей Романов
 wrote:
> Hi, Gavin,
>
> Shuffling is exactly the same in both requests and is minimal. Both requests
> produces one shuffle task. Running time is the only difference I can see in
> metrics:
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 57) ).collect, number=1)
> 0.713730096817
>  {
> "id" : 368,
> "name" : "duration total (min, med, max)",
> "value" : "524"
>   }, {
> "id" : 375,
> "name" : "internal.metrics.executorRunTime",
> "value" : "527"
>   }, {
> "id" : 391,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "244495"
>   }
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 58) ).collect, number=1)
> 2.97951102257
>
>   }, {
> "id" : 469,
> "name" : "duration total (min, med, max)",
> "value" : "2654"
>   }, {
> "id" : 476,
> "name" : "internal.metrics.executorRunTime",
> "value" : "2661"
>   }, {
> "id" : 492,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "371883"
>   }, {
>
> Full metrics in attachment.
>
> Суббота, 3 сентября 2016, 19:53 +03:00 от Gavin Yue
> :
>
>
> Any shuffling?
>
>
> On Sep 3, 2016, at 5:50 AM, Сергей Романов 
> wrote:
>
> Same problem happens with CSV data file, so it's not parquet-related either.
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 from pyspark.sql.types import *
 schema = StructType([StructField('dd_convs', FloatType(), True)])
 for x in range(50, 70): print x,
 timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
 schema=schema).groupBy().sum(*(['dd_convs'] * x) ).collect, number=1)
> 50 0.372850894928
> 51 0.376906871796
> 52 0.381325960159
> 53 0.385444164276
> 54 0.38685192
> 55 0.388918161392
> 56 0.397624969482
> 57 0.391713142395
> 58 2.62714004517
> 59 2.68421196938
> 60 2.74627685547
> 61 2.81081581116
> 62 3.43532109261
> 63 3.07742786407
> 64 3.03904604912
> 65 3.01616096497
> 66 3.06293702126
> 67 3.09386610985
> 68 3.27610206604
> 69 3.2041969299
>
> Суббота, 3 сентября 2016, 15:40 +03:00 от Сергей Романов
> :
>
> Hi,
>
> I had narrowed down my problem to a very simple case. I'm sending 27kb
> parquet in attachment. (file:///data/dump/test2 in example)
>
> Please, can you take a look at it? Why there is performance drop after 57
> sum columns?
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 for x in range(70): print x,
 timeit.timeit(spark.read.parquet('file:///data/dump/test2').groupBy().sum(*(['dd_convs']
 * x) ).collect, number=1)
> ...
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> 0 1.05591607094
> 1 0.200426101685
> 2 0.203800916672
> 3 0.176458120346
> 4 0.184863805771
> 5 0.232321023941
> 6 0.216032981873
> 7 0.201778173447
> 8 0.292424917221
> 9 0.228524923325
> 10 0.190534114838
> 11 0.197028160095
> 12 0.270443916321
> 13 0.429781913757
> 14 0.270851135254
> 15 0.776989936829
> 16 0.27879181
> 17 0.227638959885
> 18 0.212944030762
> 19 0.2144780159
> 20 0.22200012207
> 21 0.262261152267
> 22 0.254227876663
> 23 0.275084018707
> 24 0.292124032974
> 25 0.280488014221
> 16/09/03 15:31:28 WARN Utils: Truncated the string representation of a plan
> since it was too large. This behavior can be adjusted by setting
> 'spark.debug.maxToStringFields' in SparkEnv.conf.
> 26 0.290093898773
> 27 0.238478899002
> 28 0.246420860291
> 29 0.241401195526
> 30 0.255286931992
> 31 0.42702794075
> 32 0.327946186066
> 33 0.434395074844
> 34 0.314198970795
> 35 0.34576010704
> 36 0.278323888779
> 37 0.289474964142
> 38 0.290827989578
> 39 0.376291036606
> 40 0.347742080688
> 41 0.363158941269
> 42 0.318687915802
> 43 0.376327991486
> 44 0.374994039536
> 45 0.362971067429
> 46 0.425967931747
> 47 0.370860099792
> 48 0.443903923035
> 49 0.374128103256
> 50 0.378985881805
> 51 0.476850986481
> 52 0.451028823853
> 53 0.432540893555
> 54 0.514838933945
> 55 0.53990483284
> 56 

Re: Is cache() still necessary for Spark DataFrames?

2016-09-02 Thread Davies Liu
Caching a RDD/DataFrame always has some cost, in this case, I'd suggest that
do not cache the DataFrame, the first() is usually fast enough (only compute the
partitions as needed).

On Fri, Sep 2, 2016 at 1:05 PM, apu  wrote:
> When I first learnt Spark, I was told that cache() is desirable anytime one
> performs more than one Action on an RDD or DataFrame. For example, consider
> the PySpark toy example below; it shows two approaches to doing the same
> thing.
>
> # Approach 1 (bad?)
> df2 = someTransformation(df1)
> a = df2.count()
> b = df2.first() # This step could take long, because df2 has to be created
> all over again
>
> # Approach 2 (good?)
> df2 = someTransformation(df1)
> df2.cache()
> a = df2.count()
> b = df2.first() # Because df2 is already cached, this action is quick
> df2.unpersist()
>
> The second approach shown above is somewhat clunky, because it requires one
> to cache any dataframe that will be Acted on more than once, followed by the
> need to call unpersist() later to free up memory.
>
> So my question is: is the second approach still necessary/desirable when
> operating on DataFrames in newer versions of Spark (>=1.6)?
>
> Thanks!!
>
> Apu

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Davies Liu
The OOM happen in driver, you may also need more memory for driver.

On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu <dav...@databricks.com> wrote:
> You are using lots of tiny executors (128 executor with only 2G
> memory), could you try with bigger executor (for example 16G x 16)?
>
> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>>
>> So I wrote some code to reproduce the problem.
>>
>> I assume here that a pipeline should be able to transform a categorical 
>> feature with a few million levels.
>> So I create a dataframe with the categorical feature (‘id’), apply a 
>> StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
>> the amount of levels.
>> It breaks at 1.276.000 levels.
>>
>> Shall I report this as a ticket in JIRA?
>>
>> 
>>
>>
>> from pyspark.sql.functions import rand
>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>> from pyspark.ml import Pipeline
>>
>> start_id = 10
>> n = 500
>> step = (n - start_id) / 25
>>
>> for i in xrange(start_id,start_id + n,step):
>> print "#\n {}".format(i)
>> dfr = (sqlContext
>>.range(start_id, start_id + i)
>>.withColumn(‘label', rand(seed=10))
>>.withColumn('feat2', rand(seed=101))
>> #.withColumn('normal', randn(seed=27))
>>).repartition(32).cache()
>> # dfr.select("id", rand(seed=10).alias("uniform"), 
>> randn(seed=27).alias("normal")).show()
>> dfr.show(1)
>> print "This dataframe has {0} rows (and therefore {0} levels will be one 
>> hot encoded)".format(dfr.count())
>>
>> categorical_feature  = ['id']
>> stages = []
>>
>> for c in categorical_feature:
>> stages.append(StringIndexer(inputCol=c, 
>> outputCol="{}Index".format(c)))
>> stages.append(OneHotEncoder(dropLast= False, inputCol = 
>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>
>> columns = ["{}OHE".format(x) for x in categorical_feature]
>> columns.append('feat2')
>>
>> assembler = VectorAssembler(
>> inputCols=columns,
>> outputCol="features")
>> stages.append(assembler)
>>
>> df2 = dfr
>>
>> pipeline = Pipeline(stages=stages)
>> pipeline_fitted = pipeline.fit(df2)
>> df3 = pipeline_fitted.transform(df2)
>> df3.show(1)
>> dfr.unpersist()
>>
>>
>> 
>>
>> Output:
>>
>>
>> #
>>  10
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |183601|0.38693226548356197|0.04485291680169634|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 10 rows (and therefore 10 levels will be one hot 
>> encoded)
>> +--+---+---+---+++
>> |id|label  |  feat2|idIndex| 
>>   idOHE|features|
>> +--+---+---+---+++
>> |183601|
>> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
>> +--+---+---+---+++
>> only showing top 1 row
>>
>> #
>>  296000
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |137008| 0.2996020619810592|0.38693226548356197|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
>> encoded)
>> +--+---+---+---+++
>> |id|label  |  feat2|idIndex| 
>>   idOHE|features|
>> +--+---+---+-

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Davies Liu
You are using lots of tiny executors (128 executor with only 2G
memory), could you try with bigger executor (for example 16G x 16)?

On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>
> So I wrote some code to reproduce the problem.
>
> I assume here that a pipeline should be able to transform a categorical 
> feature with a few million levels.
> So I create a dataframe with the categorical feature (‘id’), apply a 
> StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
> the amount of levels.
> It breaks at 1.276.000 levels.
>
> Shall I report this as a ticket in JIRA?
>
> 
>
>
> from pyspark.sql.functions import rand
> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
> from pyspark.ml import Pipeline
>
> start_id = 10
> n = 500
> step = (n - start_id) / 25
>
> for i in xrange(start_id,start_id + n,step):
> print "#\n {}".format(i)
> dfr = (sqlContext
>.range(start_id, start_id + i)
>.withColumn(‘label', rand(seed=10))
>.withColumn('feat2', rand(seed=101))
> #.withColumn('normal', randn(seed=27))
>).repartition(32).cache()
> # dfr.select("id", rand(seed=10).alias("uniform"), 
> randn(seed=27).alias("normal")).show()
> dfr.show(1)
> print "This dataframe has {0} rows (and therefore {0} levels will be one 
> hot encoded)".format(dfr.count())
>
> categorical_feature  = ['id']
> stages = []
>
> for c in categorical_feature:
> stages.append(StringIndexer(inputCol=c, 
> outputCol="{}Index".format(c)))
> stages.append(OneHotEncoder(dropLast= False, inputCol = 
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>
> columns = ["{}OHE".format(x) for x in categorical_feature]
> columns.append('feat2')
>
> assembler = VectorAssembler(
> inputCols=columns,
> outputCol="features")
> stages.append(assembler)
>
> df2 = dfr
>
> pipeline = Pipeline(stages=stages)
> pipeline_fitted = pipeline.fit(df2)
> df3 = pipeline_fitted.transform(df2)
> df3.show(1)
> dfr.unpersist()
>
>
> 
>
> Output:
>
>
> #
>  10
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |183601|0.38693226548356197|0.04485291680169634|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 10 rows (and therefore 10 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |183601|
> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
> +--+---+---+---+++
> only showing top 1 row
>
> #
>  296000
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |137008| 0.2996020619810592|0.38693226548356197|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |137008| 
> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
> +--+---+---+---+++
> only showing top 1 row
>
> #
>  492000
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |534351| 0.9450641392552516|0.23472935141246665|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |534351| 

Re: Spark SQL concurrent runs fails with java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

2016-08-19 Thread Davies Liu
The query failed to finish broadcast in 5 minutes, you could decrease
the broadcast threshold (spark.sql.autoBroadcastJoinThreshold) or
increase the conf: spark.sql.broadcastTimeout

On Tue, Jun 28, 2016 at 3:35 PM, Jesse F Chen  wrote:
>
> With the Spark 2.0 build from 0615, when running 4-user concurrent SQL tests 
> against Spark SQL on 1TB TPCDS, we are seeing
> consistently the following exceptions:
>
> 10:35:33 AM: 16/06/27 23:40:37 INFO scheduler.TaskSetManager: Finished task 
> 412.0 in stage 819.0 (TID 270396) in 8468 ms on 9.30.148.101 (417/581)
> 16/06/27 23:40:37 ERROR thriftserver.SparkExecuteStatementOperation: Error 
> executing query, currentState RUNNING,
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition
> +- *HashAggregate(key=[], 
> functions=[partial_sum(cs_ext_discount_amt#100849)], output=[sum#101124])
> +- *Project [cs_ext_discount_amt#100849]
>
> 
> at 
> org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
> ... 40 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
> [300 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
>
>
> The longest query would complete in about 700 seconds, and I think we need to 
> increase the futures timeout value. However,
> I tried the 'spark.network.timeout' setting to 700 via the '--conf' facility 
> but it does not seem to control this particular timeout value.
> In other words, it stays at "300 seconds" no matter what value I give it. I 
> also played with the spark.rpc.askTimeout setting which
> does not affect this 300-second value.
>
> Could someone tell me which parameter I need to change in order to control it?
>
>
> JESSE CHEN
> Big Data Performance | IBM Analytics
>
> Office: 408 463 2296
> Mobile: 408 828 9068
> Email: jfc...@us.ibm.com
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrame equivalent to RDD.partionByKey

2016-08-09 Thread Davies Liu
I think you are looking for `def repartition(numPartitions: Int,
partitionExprs: Column*)`

On Tue, Aug 9, 2016 at 9:36 AM, Stephen Fletcher
 wrote:
> Is there a DataFrameReader equivalent to the RDD's partitionByKey for RDD?
> I'm reading data from a file data source and I want to partition this data
> I'm reading in to be partitioned the same way as the data I'm processing
> through a spark streaming RDD in the process.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 1.6.2 can read hive tables created with sqoop, but Spark 2.0.0 cannot

2016-08-09 Thread Davies Liu
Can you get all the fields back using Scala or SQL (bin/spark-sql)?

On Tue, Aug 9, 2016 at 2:32 PM, cdecleene  wrote:
> Some details of an example table hive table that spark 2.0 could not read...
>
> SerDe Library:
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
> InputFormat:
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
> OutputFormat:
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
>
> COLUMN_STATS_ACCURATE   false
> kite.compression.type   snappy
> numFiles0
> numRows -1
> rawDataSize -1
> totalSize0
>
> All fields within the table are of type "string" and there are less than 20
> of them.
>
> When I say that spark 2.0 cannot read the hive table, I mean that when I
> attempt to execute the following from a pyspark shell...
>
> spark = SparkSession.builder.enableHiveSupport().getOrCreate()
> df = spark.sql("SELECT * FROM dra_agency_analytics.raw_ewt_agcy_dim")
>
> ... the dataframe df has the correct number of rows and the correct columns,
> but all values read as "None".
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-2-can-read-hive-tables-created-with-sqoop-but-Spark-2-0-0-cannot-tp27502.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-09 Thread Davies Liu
When you have a Python UDF, only the input to UDF are passed into
Python process,
but all other fields that are used together with the result of UDF are
kept in a queue
then join with the result from Python. The length of this queue is depend on the
number of rows is under processing by Python (or in the buffer of
Python process).
The amount of memory required also depend on how many fields are used in the
results.

On Tue, Aug 9, 2016 at 11:09 AM, Zoltan Fedor <zoltan.1.fe...@gmail.com> wrote:
>> Does this mean you only have 1.6G memory for executor (others left for
>> Python) ?
>> The cached table could take 1.5G, it means almost nothing left for other
>> things.
> True. I have also tried with memoryOverhead being set to 800 (10% of the 8Gb
> memory), but no difference. The "GC overhead limit exceeded" is still the
> same.
>
>> Python UDF do requires some buffering in JVM, the size of buffering
>> depends on how much rows are under processing by Python process.
> I did some more testing in the meantime.
> Leaving the UDFs as-is, but removing some other, static columns from the
> above SELECT FROM command has stopped the memoryOverhead error from
> occurring. I have plenty enough memory to store the results with all static
> columns, plus when the UDFs are not there only the rest of the static
> columns are, then it runs fine. This makes me believe that having UDFs and
> many columns causes the issue together. Maybe when you have UDFs then
> somehow the memory usage depends on the amount of data in that record (the
> whole row), which includes other fields too, which are actually not used by
> the UDF. Maybe the UDF serialization to Python serializes the whole row
> instead of just the attributes of the UDF?
>
> On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
>> > 2.0.0
>> > using pyspark.
>> >
>> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
>> > executors's memory in SparkSQL, on which we would do some calculation
>> > using
>> > UDFs in pyspark.
>> > If I run my SQL on only a portion of the data (filtering by one of the
>> > attributes), let's say 800 million records, then all works well. But
>> > when I
>> > run the same SQL on all the data, then I receive
>> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically
>> > all
>> > of the executors.
>> >
>> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
>> > causing this "GC overhead limit being exceeded".
>> >
>> > Details:
>> >
>> > - using Spark 2.0.0 on a Hadoop YARN cluster
>> >
>> > - 300 executors, each with 2 CPU cores and 8Gb memory (
>> > spark.yarn.executor.memoryOverhead=6400 )
>>
>> Does this mean you only have 1.6G memory for executor (others left for
>> Python) ?
>> The cached table could take 1.5G, it means almost nothing left for other
>> things.
>>
>> Python UDF do requires some buffering in JVM, the size of buffering
>> depends on
>> how much rows are under processing by Python process.
>>
>> > - a table of 5.6 Billions rows loaded into the memory of the executors
>> > (taking up 450Gb of memory), partitioned evenly across the executors
>> >
>> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
>> > exceeded' error if running on all records. Running the same on a smaller
>> > dataset (~800 million rows) does succeed. If no UDF, the query succeed
>> > on
>> > the whole dataset.
>> >
>> > - simplified pyspark code:
>> >
>> > from pyspark.sql.types import StringType
>> >
>> > def test_udf(var):
>> > """test udf that will always return a"""
>> > return "a"
>> > sqlContext.registerFunction("test_udf", test_udf, StringType())
>> >
>> > sqlContext.sql("""CACHE TABLE ma""")
>> >
>> > results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
>> > test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
>> > ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
>> > STANDARD_ACCOUNT_CITY_SRC)
>> >  /
>> > CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-08 Thread Davies Liu
On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor  wrote:
> Hi all,
>
> I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
> using pyspark.
>
> There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
> executors's memory in SparkSQL, on which we would do some calculation using
> UDFs in pyspark.
> If I run my SQL on only a portion of the data (filtering by one of the
> attributes), let's say 800 million records, then all works well. But when I
> run the same SQL on all the data, then I receive
> "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically all
> of the executors.
>
> It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> causing this "GC overhead limit being exceeded".
>
> Details:
>
> - using Spark 2.0.0 on a Hadoop YARN cluster
>
> - 300 executors, each with 2 CPU cores and 8Gb memory (
> spark.yarn.executor.memoryOverhead=6400 )

Does this mean you only have 1.6G memory for executor (others left for Python) ?
The cached table could take 1.5G, it means almost nothing left for other things.

Python UDF do requires some buffering in JVM, the size of buffering depends on
how much rows are under processing by Python process.

> - a table of 5.6 Billions rows loaded into the memory of the executors
> (taking up 450Gb of memory), partitioned evenly across the executors
>
> - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> exceeded' error if running on all records. Running the same on a smaller
> dataset (~800 million rows) does succeed. If no UDF, the query succeed on
> the whole dataset.
>
> - simplified pyspark code:
>
> from pyspark.sql.types import StringType
>
> def test_udf(var):
> """test udf that will always return a"""
> return "a"
> sqlContext.registerFunction("test_udf", test_udf, StringType())
>
> sqlContext.sql("""CACHE TABLE ma""")
>
> results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
> test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> STANDARD_ACCOUNT_CITY_SRC)
>  /
> CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> (STANDARD_ACCOUNT_CITY_SRC)
> THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> FROM ma""")
>
> results_df.registerTempTable("m")
> sqlContext.cacheTable("m")
>
> results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> print(results_df.take(1))
>
>
> - the error thrown on the executors:
>
> 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
> writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
> at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL TERM
>
>
> Has anybody experienced these "GC overhead limit exceeded" errors with
> pyspark UDFs before?
>
> Thanks,
> Zoltan
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: converting timestamp from UTC to many time zones

2016-06-17 Thread Davies Liu
The DataFrame API does not support this use case, you can use still
use SQL do that,

df.selectExpr("from_utc_timestamp(start, tz) as testthis")

On Thu, Jun 16, 2016 at 9:16 AM, ericjhilton  wrote:
> This is using python with Spark 1.6.1 and dataframes.
>
> I have timestamps in UTC that I want to convert to local time, but a given
> row could be in any of several timezones. I have an 'offset' value (or
> alternately, the local timezone abbreviation. I can adjust all the
> timestamps to a single zone or with a single offset easily enough, but I
> can't figure out how to make the adjustment dependent on the 'offset' or
> 'tz' column.
>
> There appear to be 2 main ways of adjusting a timestamp: using the
> 'INTERVAL' method, or using pyspark.sql.from_utc_timestamp.
>
> Here's an example:
> ---
>
> data = [ ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1, 300,"MST"),
> ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2, 60,"EST"),
> ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3, 120,"EST"),
> ("2015-03-02 15:59:58", "2015-01-02 23:59:59", 4, 120,"PST"),
> ("2015-03-16 15:15:58", "2015-01-02 23:59:59", 5, 120,"PST"),
> ("2015-10-02 18:59:58", "2015-01-02 23:59:59", 4, 120,"PST"),
> ("2015-11-16 18:58:58", "2015-01-02 23:59:59", 5, 120,"PST"),
> ("2015-03-02 15:59:58", "2015-01-02 23:59:59", 4, 120,"MST"),
> ("2015-03-16 15:15:58", "2015-01-02 23:59:59", 5, 120,"MST"),
> ("2015-10-02 18:59:58", "2015-01-02 23:59:59", 4, 120,"MST"),
> ("2015-11-16 18:58:58", "2015-01-02 23:59:59", 5, 120,"MST"),]
>
> df = sqlCtx.createDataFrame(data, ["start_time", "end_time",
> "id","offset","tz"])
> from pyspark.sql import functions as F
>
> df.withColumn('testthis', F.from_utc_timestamp(df.start_time, "PST")).show()
> df.withColumn('testThat', df.start_time.cast("timestamp") - F.expr("INTERVAL
> 50 MINUTES")).show()
>
> 
> those last 2 lines work as expected, but I want to replace "PST" with the
> df.tz column or use the df.offset column with INTERVAL
>
>
> Here's the error I get. Is there a workaround to this?
>
> ---
> TypeError Traceback (most recent call last)
>  in ()
> > 1 df.withColumn('testthis', F.from_utc_timestamp(df.start_time,
> df.tz)).show()
>
> /opt/spark-1.6.1/python/pyspark/sql/functions.py in
> from_utc_timestamp(timestamp, tz)
> 967 """
> 968 sc = SparkContext._active_spark_context
> --> 969 return
> Column(sc._jvm.functions.from_utc_timestamp(_to_java_column(timestamp), tz))
> 970
> 971
>
> /opt/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
> __call__(self, *args)
> 796 def __call__(self, *args):
> 797 if self.converters is not None and len(self.converters) > 0:
> --> 798 (new_args, temp_args) = self._get_args(args)
> 799 else:
> 800 new_args = args
>
> /opt/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
> _get_args(self, args)
> 783 for converter in self.gateway_client.converters:
> 784 if converter.can_convert(arg):
> --> 785 temp_arg = converter.convert(arg,
> self.gateway_client)
> 786 temp_args.append(temp_arg)
> 787 new_args.append(temp_arg)
>
> /opt/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_collections.py in
> convert(self, object, gateway_client)
> 510 HashMap = JavaClass("java.util.HashMap", gateway_client)
> 511 java_map = HashMap()
> --> 512 for key in object.keys():
> 513 java_map[key] = object[key]
> 514 return java_map
>
> TypeError: 'Column' object is not callable
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/converting-timestamp-from-UTC-to-many-time-zones-tp27182.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark.GroupedData.agg works incorrectly when one column is aggregated twice?

2016-06-09 Thread Davies Liu
This one works as expected:

```
>>> spark.range(10).selectExpr("id", "id as k").groupBy("k").agg({"k": "count", 
>>> "id": "sum"}).show()
+---++---+
|  k|count(k)|sum(id)|
+---++---+
|  0|   1|  0|
|  7|   1|  7|
|  6|   1|  6|
|  9|   1|  9|
|  5|   1|  5|
|  1|   1|  1|
|  3|   1|  3|
|  8|   1|  8|
|  2|   1|  2|
|  4|   1|  4|
+---++---+
```

Have you try to remove the orderBy? that looks weird.


On Fri, May 27, 2016 at 4:28 AM, Andrew Vykhodtsev  wrote:
> Dear list,
>
> I am trying to calculate sum and count on the same column:
>
> user_id_books_clicks =
> (sqlContext.read.parquet('hdfs:///projects/kaggle-expedia/input/train.parquet')
>   .groupby('user_id')
>   .agg({'is_booking':'count',
> 'is_booking':'sum'})
>   .orderBy(fn.desc('count(user_id)'))
>   .cache()
>)
>
> If I do it like that, it only gives me one (last) aggregate -
> sum(is_booking)
>
> But if I change to .agg({'user_id':'count', 'is_booking':'sum'})  -  it
> gives me both. I am on 1.6.1. Is it fixed in 2.+? Or should I report it to
> JIRA?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 2 tables join happens at Hive but not in spark

2016-05-18 Thread Davies Liu
What the schema of the two tables looks like? Could you also show the
explain of the query?

On Sat, Feb 27, 2016 at 2:10 AM, Sandeep Khurana  wrote:
> Hello
>
> We have 2 tables  (tab1, tab2) exposed using hive. The data is in different
> hdfs folders. We are trying to join these 2 tables on certain single column
> using sparkR join. But inspite of join columns having same values, it
> returns zero rows.
>
> But when I run the same join sql in hive, from hive console, to get the
> count(*), I do get millions of records meeting the join criteria.
>
> The join columns are of 'int' type. Also, when I join 'tab1' from one of
> these 2 tables for which join is not working with another 3rd table 'tab3'
> separately, that join works.
>
> To debug , we selected just 1 row in the sparkR script from tab1 and also 1
> row row having the same value of join column from tab2 also. We used
> 'select' sparkR function for this. Now, our dataframes for tab1 and tab2
> have single row each and the join columns have same value in both, but still
> joining these 2 dataframes having single row each and with same join column,
> the join returned zero rows.
>
>
> We are running the script from rstudio. It does not give any error. It runs
> fine. But gives zero join results whereas on hive I do get many rows for
> same join. Any idea what might be the cause of this?
>
>
>
> --
> Architect
> Infoworks.io
> http://Infoworks.io

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: broadcast variable not picked up

2016-05-16 Thread Davies Liu
broadcast_var is only defined in foo(), I think you should have `global` for it.

def foo():
   global broadcast_var
   broadcast_var = sc.broadcast(var)

On Fri, May 13, 2016 at 3:53 PM, abi  wrote:
> def kernel(arg):
> input = broadcast_var.value + 1
> #some processing with input
>
> def foo():
>   
>   
>   broadcast_var = sc.broadcast(var)
>   rdd.foreach(kernel)
>
>
> def main():
>#something
>
>
> In this code , I get the following error:
> NameError: global name 'broadcast_var ' is not defined
>
>
> Any ideas on how to fix it ?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-variable-not-picked-up-tp26955.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark dataframe sort issue

2016-05-08 Thread Davies Liu
When you have multiple parquet files, the order of all the rows in
them is not defined.

On Sat, May 7, 2016 at 11:48 PM, Buntu Dev  wrote:
> I'm using pyspark dataframe api to sort by specific column and then saving
> the dataframe as parquet file. But the resulting parquet file doesn't seem
> to be sorted.
>
> Applying sort and doing a head() on the results shows the correct results
> sorted by 'value' column in desc order, as shown below:
>
> ~
>>>df=sqlContext.read.parquet("/some/file.parquet")
>>>df.printSchema()
>
> root
>  |-- c1: string (nullable = true)
>  |-- c2: string (nullable = true)
>  |-- value: double (nullable = true)
>
>>>df.sort(df.value.desc()).head(3)
>
> [Row(c1=u'546', c2=u'234', value=1020.25), Row(c1=u'3212', c2=u'6785',
> value=890.6), Row(c1=u'546', c2=u'234', value=776.45)]
> ~~
>
> But saving the sorted dataframe as parquet and fetching the first N rows
> using head() doesn't seem to return the results ordered by 'value' column:
>
> 
>>>df=sqlContext.read.parquet("/some/file.parquet")
>>>df.sort(df.value.desc()).write.parquet("/sorted/file.parquet")
> ...
>>>df2=sqlContext.read.parquet("/sorted/file.parquet")
>>>df2.head(3)
>
> [Row(c1=u'444', b2=u'233', value=0.024120907), Row(c1=u'5672', c2=u'9098',
> value=0.024120906), Row(c1=u'546', c2=u'234', value=0.024120905)]
> 
>
> How do I go about sorting and saving a sorted dataframe?
>
>
> Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Davies Liu
Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
'2016-01-03' is the root cause,
 which will filter out all the nulls from outer join, will have same
result as inner join.

In Spark 2.0, we turn these join into inner join actually.

On Tue, May 3, 2016 at 9:50 AM, Cesar Flores <ces...@gmail.com> wrote:
> Hi
>
> Have you tried the joins without the where clause? When you use them you are
> filtering all the rows with null columns in those fields. In other words you
> are doing a inner join in all your queries.
>
> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>>
>> Hi Kevin,
>>
>> Having given it a first look I do think that you have hit something here
>> and this does not look quite fine. I have to work on the multiple AND
>> conditions in ON and see whether that is causing any issues.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng <kpe...@gmail.com> wrote:
>>>
>>> Davies,
>>>
>>> Here is the code that I am typing into the spark-shell along with the
>>> results (my question is at the bottom):
>>>
>>> val dps =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").load("file:///home/ltu/dps_csv/")
>>> val swig =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").load("file:///home/ltu/swig_csv/")
>>>
>>> dps.count
>>> res0: Long = 42694
>>>
>>> swig.count
>>> res1: Long = 42034
>>>
>>>
>>> dps.registerTempTable("dps_pin_promo_lt")
>>> swig.registerTempTable("swig_pin_promo_lt")
>>>
>>> sqlContext.sql("select * from dps_pin_promo_lt where date >
>>> '2016-01-03'").count
>>> res4: Long = 42666
>>>
>>> sqlContext.sql("select * from swig_pin_promo_lt where date >
>>> '2016-01-03'").count
>>> res5: Long = 34131
>>>
>>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
>>> where date > '2016-01-03'").count
>>> res6: Long = 42533
>>>
>>> sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
>>> where date > '2016-01-03'").count
>>> res7: Long = 34131
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res9: Long = 23809
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res10: Long = 23809
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res11: Long = 23809
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res12: Long = 23809
>>>
>>>
>>>
>>> From my results above, we notice that the counts of distinct values based
>>> on the join criteria and filter criteria for each individual table is
>>> located at res6 and res7.  My question is why is the outer join producing
>>> less rows tha

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Davies Liu
as @Gourav said, all the join with different join type show the same results,
which meant that all the rows from left could match at least one row from right,
all the rows from right could match at least one row from left, even
the number of row from left does not equal that of right.

This is correct result.

On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
> Yong,
>
> Sorry, let explain my deduction; it is going be difficult to get a sample
> data out since the dataset I am using is proprietary.
>
> From the above set queries (ones mentioned in above comments) both inner and
> outer join are producing the same counts.  They are basically pulling out
> selected columns from the query, but there is no roll up happening or
> anything that would possible make it suspicious that there is any difference
> besides the type of joins.  The tables are matched based on three keys that
> are present in both tables (ad, account, and date), on top of this they are
> filtered by date being above 2016-01-03.  Since all the joins are producing
> the same counts, the natural suspicions is that the tables are identical,
> but I when I run the following two queries:
>
> scala> sqlContext.sql("select * from swig_pin_promo_lt where date
>>='2016-01-03'").count
>
> res14: Long = 34158
>
> scala> sqlContext.sql("select * from dps_pin_promo_lt where date
>>='2016-01-03'").count
>
> res15: Long = 42693
>
>
> The above two queries filter out the data based on date used by the joins of
> 2016-01-03 and you can see the row count between the two tables are
> different, which is why I am suspecting something is wrong with the outer
> joins in spark sql, because in this situation the right and outer joins may
> produce the same results, but it should not be equal to the left join and
> definitely not the inner join; unless I am missing something.
>
>
> Side note: In my haste response above I posted the wrong counts for
> dps.count, the real value is res16: Long = 42694
>
>
> Thanks,
>
>
> KP
>
>
>
>
> On Mon, May 2, 2016 at 12:50 PM, Yong Zhang  wrote:
>>
>> We are still not sure what is the problem, if you cannot show us with some
>> example data.
>>
>> For dps with 42632 rows, and swig with 42034 rows, if dps full outer join
>> with swig on 3 columns; with additional filters, get the same resultSet row
>> count as dps lefter outer join with swig on 3 columns, with additional
>> filters, again get the the same resultSet row count as dps right outer join
>> with swig on 3 columns, with same additional filters.
>>
>> Without knowing your data, I cannot see the reason that has to be a bug in
>> the spark.
>>
>> Am I misunderstanding your bug?
>>
>> Yong
>>
>> 
>> From: kpe...@gmail.com
>> Date: Mon, 2 May 2016 12:11:18 -0700
>> Subject: Re: Weird results with Spark SQL Outer joins
>> To: gourav.sengu...@gmail.com
>> CC: user@spark.apache.org
>>
>>
>> Gourav,
>>
>> I wish that was case, but I have done a select count on each of the two
>> tables individually and they return back different number of rows:
>>
>>
>> dps.registerTempTable("dps_pin_promo_lt")
>> swig.registerTempTable("swig_pin_promo_lt")
>>
>>
>> dps.count()
>> RESULT: 42632
>>
>>
>> swig.count()
>> RESULT: 42034
>>
>> On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta
>>  wrote:
>>
>> This shows that both the tables have matching records and no mismatches.
>> Therefore obviously you have the same results irrespective of whether you
>> use right or left join.
>>
>> I think that there is no problem here, unless I am missing something.
>>
>> Regards,
>> Gourav
>>
>> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>>
>> Also, the results of the inner query produced the same results:
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
>> =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
>> RESULT:23747
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save RDD to HDFS using Spark Python API

2016-04-26 Thread Davies Liu
hdfs://192.168.10.130:9000/dev/output/test already exists, so you need
to remove it first.

On Tue, Apr 26, 2016 at 5:28 AM, Luke Adolph  wrote:
> Hi, all:
> Below is my code:
>
> from pyspark import *
> import re
>
> def getDateByLine(input_str):
> str_pattern = '^\d{4}-\d{2}-\d{2}'
> pattern = re.compile(str_pattern)
> match = pattern.match(input_str)
> if match:
> return match.group()
> else:
> return None
>
> file_url = "hdfs://192.168.10.130:9000/dev/test/test.log"
> input_file = sc.textFile(file_url)
> line = input_file.filter(getDateByLine).map(lambda x: (x[:10], 1))
> counts = line.reduceByKey(lambda a,b: a+b)
> print counts.collect()
> counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",\
> "org.apache.hadoop.mapred.SequenceFileOutputFormat")
>
>
> What I confused is the method saveAsHadoopFile,I have read the pyspark API,
> But I still don’t understand the second arg mean
>
> Below is the output when I run above code:
> ```
>
> [(u'2016-02-29', 99), (u'2016-03-02', 30)]
>
> ---
> Py4JJavaError Traceback (most recent call last)
>  in ()
>  18 counts = line.reduceByKey(lambda a,b: a+b)
>  19 print counts.collect()
> ---> 20
> counts.saveAsHadoopFile("hdfs://192.168.10.130:9000/dev/output/test",
> "org.apache.hadoop.mapred.SequenceFileOutputFormat")
>
> /mydata/softwares/spark-1.6.1/python/pyspark/rdd.pyc in
> saveAsHadoopFile(self, path, outputFormatClass, keyClass, valueClass,
> keyConverter, valueConverter, conf, compressionCodecClass)
>1419  keyClass,
> valueClass,
>1420  keyConverter,
> valueConverter,
> -> 1421  jconf,
> compressionCodecClass)
>1422
>1423 def saveAsSequenceFile(self, path, compressionCodecClass=None):
>
> /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
> 811 answer = self.gateway_client.send_command(command)
> 812 return_value = get_return_value(
> --> 813 answer, self.gateway_client, self.target_id, self.name)
> 814
> 815 for temp_arg in temp_args:
>
> /mydata/softwares/spark-1.6.1/python/pyspark/sql/utils.pyc in deco(*a, **kw)
>  43 def deco(*a, **kw):
>  44 try:
> ---> 45 return f(*a, **kw)
>  46 except py4j.protocol.Py4JJavaError as e:
>  47 s = e.java_exception.toString()
>
> /mydata/softwares/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py
> in get_return_value(answer, gateway_client, target_id, name)
> 306 raise Py4JJavaError(
> 307 "An error occurred while calling {0}{1}{2}.\n".
> --> 308 format(target_id, ".", name), value)
> 309 else:
> 310 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.saveAsHadoopFile.
> : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory
> hdfs://192.168.10.130:9000/dev/output/test already exists
>   at
> org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
>   at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179)
>   at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
>   at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
>   at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>   at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
>   at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
>   at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
>   at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
>   at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>   at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
>   at
> org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:753)
>   at 
> 

Re: EOFException while reading from HDFS

2016-04-26 Thread Davies Liu
The Spark package you are using is packaged with Hadoop 2.6, but the
HDFS is Hadoop 1.0.4, they are not compatible.

On Tue, Apr 26, 2016 at 11:18 AM, Bibudh Lahiri  wrote:
> Hi,
>   I am trying to load a CSV file which is on HDFS. I have two machines:
> IMPETUS-1466 (172.26.49.156) and IMPETUS-1325 (172.26.49.55). Both have
> Spark 1.6.0 pre-built for Hadoop 2.6 and later, but for both, I had existing
> Hadoop clusters running Hadoop 1.0.4. I have launched HDFS from
> 172.26.49.156 by running start-dfs.sh from it, copied files from local file
> system to HDFS and can view them by hadoop fs -ls.
>
>   However, when I am trying to load the CSV file from pyspark shell
> (launched by bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0)
> from IMPETUS-1325 (172.26.49.55) with the following commands:
>
>
>>>from pyspark.sql import SQLContext
>
>>>sqlContext = SQLContext(sc)
>
>>>patients_df =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "false").load("hdfs://172.26.49.156:54310/bibudh/healthcare/data/cloudera_challenge/patients.csv")
>
>
> I get the following error:
>
>
> java.io.EOFException: End of File Exception between local host is:
> "IMPETUS-1325.IMPETUS.CO.IN/172.26.49.55"; destination host is:
> "IMPETUS-1466":54310; : java.io.EOFException; For more details see:
> http://wiki.apache.org/hadoop/EOFException
>
>
> U have changed the port number from 54310 to 8020, but then I get the error
>
>
> java.net.ConnectException: Call From IMPETUS-1325.IMPETUS.CO.IN/172.26.49.55
> to IMPETUS-1466:8020 failed on connection exception:
> java.net.ConnectException: Connection refused; For more details see:
> http://wiki.apache.org/hadoop/ConnectionRefused
>
>
> To me it seemed like this may result from a version mismatch between Spark
> Hadoop client and my Hadoop cluster, so I have made the following changes:
>
>
> 1) Added the following lines to conf/spark-env.sh
>
>
> export HADOOP_HOME="/usr/local/hadoop-1.0.4" export
> HADOOP_CONF_DIR="$HADOOP_HOME/conf" export
> HDFS_URL="hdfs://172.26.49.156:8020"
>
>
> 2) Downloaded Spark 1.6.0, pre-built with user-provided Hadoop, and in
> addition to the three lines above, added the following line to
> conf/spark-env.sh
>
>
> export SPARK_DIST_CLASSPATH="/usr/local/hadoop-1.0.4/bin/hadoop"
>
>
> but none of it seems to work. However, the following command works from
> 172.26.49.55 and gives the directory listing:
>
> /usr/local/hadoop-1.0.4/bin/hadoop fs -ls hdfs://172.26.49.156:54310/
>
>
> Any suggestion?
>
>
> Thanks
>
> Bibudh
>
>
> --
> Bibudh Lahiri
> Data Scientist, Impetus Technolgoies
> 5300 Stevens Creek Blvd
> San Jose, CA 95129
> http://knowthynumbers.blogspot.com/
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark EOFError after calling map

2016-04-22 Thread Davies Liu
This exception is already handled well, just noisy, should be muted.

On Wed, Apr 13, 2016 at 4:52 PM, Pete Werner  wrote:

> Hi
>
> I am new to spark & pyspark.
>
> I am reading a small csv file (~40k rows) into a dataframe.
>
> from pyspark.sql import functions as F
> df =
> sqlContext.read.format('com.databricks.spark.csv').options(header='true',
> inferschema='true').load('/tmp/sm.csv')
> df = df.withColumn('verified', F.when(df['verified'] == 'Y',
> 1).otherwise(0))
> df2 = df.map(lambda x: Row(label=float(x[0]),
> features=Vectors.dense(x[1:]))).toDF()
>
> I get some weird error that does not occur every single time, but does
> happen pretty regularly
>
> >>> df2.show(1)
> ++-+
> |features|label|
> ++-+
> |[0.0,0.0,0.0,0.0,...|0.0|
> ++-+
> only showing top 1 row
>
> >>> df2.count()
> 41999
>
> >>> df2.show(1)
> ++-+
> |features|label|
> ++-+
> |[0.0,0.0,0.0,0.0,...|0.0|
> ++-+
> only showing top 1 row
>
> >>> df2.count()
> 41999
>
> >>> df2.show(1)
> Traceback (most recent call last):
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157,
> in manager
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in
> worker
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136,
> in main
> if read_int(infile) == SpecialLengths.END_OF_STREAM:
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line
> 545, in read_int
> raise EOFError
> EOFError
> ++-+
> |features|label|
> ++-+
> |[0.0,0.0,0.0,0.0,...|4700734.0|
> ++-+
> only showing top 1 row
>
> Once that EOFError has been raised, I will not see it again until I do
> something that requires interacting with the spark server
>
> When I call df2.count() it shows that [Stage xxx] prompt which is what I
> mean by it going to the spark server.
>
> Anything that triggers that seems to eventually end up giving the EOFError
> again when I do something with df2.
>
> It does not seem to happen with df (vs. df2) so seems like it must be
> something happening with the df.map() line.
>
> --
>
> Pete Werner
> Data Scientist
> Freelancer.com
>
> Level 20
> 680 George Street
> Sydney NSW 2000
>
> e: pwer...@freelancer.com
> p:  +61 2 8599 2700
> w: http://www.freelancer.com
>
>


Re: How to estimate the size of dataframe using pyspark?

2016-04-11 Thread Davies Liu
That's weird, DataFrame.count() should not require lots of memory on
driver, could you provide a way to reproduce it (could generate fake
dataset)?

On Sat, Apr 9, 2016 at 4:33 PM, Buntu Dev  wrote:
> I've allocated about 4g for the driver. For the count stage, I notice the
> Shuffle Write to be 13.9 GB.
>
> On Sat, Apr 9, 2016 at 11:43 AM, Ndjido Ardo BAR  wrote:
>>
>> What's the size of your driver?
>> On Sat, 9 Apr 2016 at 20:33, Buntu Dev  wrote:
>>>
>>> Actually, df.show() works displaying 20 rows but df.count() is the one
>>> which is causing the driver to run out of memory. There are just 3 INT
>>> columns.
>>>
>>> Any idea what could be the reason?
>>>
>>> On Sat, Apr 9, 2016 at 10:47 AM,  wrote:

 You seem to have a lot of column :-) !
 df.count() displays the size of your data frame.
 df.columns.size() the number of columns.

 Finally, I suggest you check the size of your drive and customize it
 accordingly.

 Cheers,

 Ardo

 Sent from my iPhone

 > On 09 Apr 2016, at 19:37, bdev  wrote:
 >
 > I keep running out of memory on the driver when I attempt to do
 > df.show().
 > Can anyone let me know how to estimate the size of the dataframe?
 >
 > Thanks!
 >
 >
 >
 > --
 > View this message in context:
 > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-estimate-the-size-of-dataframe-using-pyspark-tp26729.html
 > Sent from the Apache Spark User List mailing list archive at
 > Nabble.com.
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >
>>>
>>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: strange behavior of pyspark RDD zip

2016-04-11 Thread Davies Liu
It seems like a bug, could you file a JIRA for this?
(also post a way to reproduce it)


On Fri, Apr 1, 2016 at 11:08 AM, Sergey  wrote:
> Hi!
>
> I'm on Spark 1.6.1 in local mode on Windows.
>
> And have issue with zip of zip'pping of two RDDs of __equal__ size and
> __equal__ partitions number (I also tried to repartition both RDDs to one
> partition).
> I get such exception when I do rdd1.zip(rdd2).count():
>
> File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
>   File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 106, in
> process
>   File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 263,
> in dump_stream
> vs = list(itertools.islice(iterator, batch))
>   File "c:\spark\python\pyspark\rddsampler.py", line 95, in func
> for obj in iterator:
>   File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 322,
> in load_stream
> " in pair: (%d, %d)" % (len(keys), len(vals)))
> ValueError: Can not deserialize RDD with different number of items in pair:
> (256, 512)
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Davies Liu
On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang  wrote:
> Here is the output:
>
> == Parsed Logical Plan ==
> Project [400+ columns]
> +- Project [400+ columns]
>+- Project [400+ columns]
>   +- Project [400+ columns]
>  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> :- Relation[400+ columns] ParquetRelation
> +- BroadcastHint
>+- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>   +- Filter (instr(event_list#105,202) > 0)
>  +- Relation[400+ columns] ParquetRelation
>
> == Analyzed Logical Plan ==
> 400+ columns
> Project [400+ columns]
> +- Project [400+ columns]
>+- Project [400+ columns]
>   +- Project [400+ columns]
>  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> :- Relation[400+ columns] ParquetRelation
> +- BroadcastHint
>+- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>   +- Filter (instr(event_list#105,202) > 0)
>  +- Relation[400+ columns] ParquetRelation
>
> == Optimized Logical Plan ==
> Project [400+ columns]
> +- Join Inner, Somevisid_high#460L = visid_high#948L) && (visid_low#461L
> = visid_low#949L)) && (date_time#25L > date_time#513L)))
>:- Relation[400+ columns] ParquetRelation
>+- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
>   +- BroadcastHint
>  +- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> +- Filter (instr(event_list#105,202) > 0)
>+- Relation[400+ columns] ParquetRelation

There is a Project on top of BroadcastHint, which is inserted by
column pruning rule, that make
the SparkStratege can not regonize BroadcastHint anymore, it's fixed
recently in master [1]

https://github.com/apache/spark/pull/11260

Your join should run as expected in master.

> == Physical Plan ==
> Project [400+ columns]
> +- Filter (date_time#25L > date_time#513L)
>+- SortMergeJoin [visid_high#948L,visid_low#949L],
> [visid_high#460L,visid_low#461L]
>   :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
>   :  +- TungstenExchange
> hashpartitioning(visid_high#948L,visid_low#949L,200), None
>   : +- Scan ParquetRelation[400+ columns] InputPaths:
> hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11,
> hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14,
> hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17,
> hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,
> hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06,
> hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09,
> hdfs://xxx/2016/03/10, hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12,
> hdfs://xxx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15,
> hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17
>   +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0
>  +- TungstenExchange
> hashpartitioning(visid_high#460L,visid_low#461L,200), None
> +- Project
> 

Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Davies Liu
The broadcast hint does not work as expected in this case, could you
also how the logical plan by 'explain(true)'?

On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang  wrote:
>
> So I am testing this code to understand "broadcast" feature of DF on Spark 
> 1.6.1.
> This time I am not disable "tungsten". Everything is default value, except 
> setting memory and cores of my job on 1.6.1.
>
> I am testing the join2 case
>
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> and here is the DAG visualization in the runtime of my testing job:
>
>
>
>
>
> So now, I don't understand how the "broadcast" works on DateFrame in Spark. I 
> originally thought it will be the same as "mapjoin" in the hive, but can 
> someone explain the DAG above me?
>
> I have one day data about 1.5G compressed parquet file, filter by 
> "instr(loadRaw("event_list"), "202") > 0", which will only output about 1494 
> rows (very small), and it is the "trailRaw" DF in my example.
> Stage 3 has a filter, which I thought is for the trailRaw data, but the stage 
> statics doesn't match with the data. I don't know why the input is only 78M, 
> and shuffle write is about 97.6KB
>
>
>
>
> The historyRaw will be about 90 days history data, which should be about 
> 100G, so it looks like stage 4 is scanning it
>
>
>
>
> Now, my original thought is that small data will be broadcasted to all the 
> nodes, and most of history data will be filtered out by the join keys, at 
> least that will be the "mapjoin" in Hive will do, but from the DAG above, I 
> didn't see it working this way.
> It is more like that Spark use the SortMerge join to shuffle both data across 
> network, and filter on the "reducers" side by the join keys, to get the final 
> output. But that is not the "broadcast" join supposed to do, correct?
> In the last stage, it will be very slow, until it reach and process all the 
> history data,  shown below as "shuffle read" reaching 720G, to finish.
>
>
>
>
> One thing I notice that if tungsten is enable, the shuffle write volume on 
> stage 4 is larger (720G) than when tungsten is disable (506G) in my 
> originally run, for the exactly same input. It is an interesting point, does 
> anyone have some idea about this?
>
>
> Overall, for my test case, "broadcast" join is the exactly most optimized way 
> I should use; but somehow, I cannot make it do the same way as "mapjoin" of 
> Hive, even in Spark 1.6.1.
>
> As I said, this is a just test case. We have some business cases making sense 
> to use "broadcast" join, but until I understand exactly how to make it work 
> as I expect in Spark, I don't know what to do.
>
> Yong
>
> 
> From: java8...@hotmail.com
> To: user@spark.apache.org
> Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> Date: Tue, 22 Mar 2016 13:08:31 -0400
>
>
> Please help me understand how the "broadcast" will work on DF in Spark 1.5.2.
>
> Below are the 2 joins I tested and the physical plan I dumped:
>
> val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> join1.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  SortMergeJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L]
>   ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
>Exchange hashpartitioning(visid_high#948L,visid_low#949L)
> Scan ParquetRelation[hdfs://]
>   ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
>Exchange hashpartitioning(visid_high#460L,visid_low#461L)
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>  Filter (instr(event_list#105,202) > 0)
>   Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> join2.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  BroadcastHashJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L], BuildRight
> Scan ParquetRelation[hdfs://]
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>Filter (instr(event_list#105,202) > 0)
> Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> Obvious, the explain plans are different, but the performance and the job 
> execution steps are almost exactly same, as shown in the original picture in 
> the email below.
> Keep in 

Re: unix_timestamp() time zone problem

2016-03-19 Thread Davies Liu
Could you try to cast the timestamp as long?

Internally, timestamp are stored as microseconds in UTC, you will got
seconds in UTC if you cast it to long.

On Thu, Mar 17, 2016 at 1:28 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I am using python spark 1.6 and the --packages
> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10
>
> I need to convert a time stamp string into a unix epoch time stamp. The
> function unix_timestamp() function assume current time zone. How ever my
> string data is UTC and encodes the time zone as zero. I have not been able
> to find a way to get the unix time calculated correctly. simpleDateFormat
> does not have good time zone support. Any suggestions?
>
> I could write a UDF and to adjust for time zones how ever this seems like
>  a hack
>
> I tried using to_utc_timestamp(created, 'gmt’) how ever this creates a
> timestamp. I have not been able to figure out how to convert this to a unix
> time sample I.e a long representing epoch
>
> Any suggestions?
>
> stmnt = "select \
> row_key, created, count, unix_timestamp(created) as
> unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as etc \
>  from \
> rawTable \
>  where \
>  (created > '{0}') and (created <= '{1}') \
>  and \
>  (row_key = ‘blue' \
> or row_key = ‘red' \
> )".format('2016-03-12 00:30:00+', '2016-03-12
> 04:30:00+’)
>
>
> Sample out put
>
> root
>  |-- row_key: string (nullable = true)
>  |-- created: timestamp (nullable = true)
>  |-- count: long (nullable = true)
>  |-- unixTimeStamp: long (nullable = true)
>  |-- etc: long (nullable = true)
>
> 2016-03-12 00:30:30.0 should be 1457742630 not 1457771430
>
> +-+-+-+-+--+
> |row_key  |created|count|unixTimeStamp|utc|
> +-+-+-+-+--+
> |red|2016-03-12 00:30:30.0|2|1457771430   |1457771430|
> |red|2016-03-12 00:30:45.0|1|1457771445   |1457771445|
>
>
>
> static Column
> 
>  *unix_timestamp
> *
> (Column
> 
>  s)
> Converts time string in format -MM-dd HH:mm:ss to Unix timestamp (in
> seconds), using the default timezone and the default locale, return null if
> fail.
> static Column
> 
>  *unix_timestamp
> *
> (Column
> 
>  s,
> java.lang.String p)
> Convert time string with given pattern (see [
> http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
> to Unix time stamp (in seconds), return null if fail.
>


Re: sql timestamp timezone bug

2016-03-19 Thread Davies Liu
On Thu, Mar 17, 2016 at 3:02 PM, Andy Davidson
 wrote:
> I am using pyspark 1.6.0 and
> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
> data
>
> The data is originally captured by a spark streaming app and written to
> Cassandra. The value of the timestamp comes from
>
> Rdd.foreachRDD(new VoidFunction2()
> Š});
>
> I am confident the time stamp is stored correctly in cassandra and that
> the clocks on the machines in my cluster are set correctly
>
> I noticed that if I used Cassandra CQLSH to select a data set between two
> points in time the row count did not match the row count I got when I did
> the same select in spark using SQL, It appears the spark sql assumes all
> timestamp strings are in the local time zone.
>
>
> Here is what I expect. (this is what is returned by CQLSH)
> cqlsh> select
>... count(row_key) as num_samples, sum(count) as total, max(count)
> as max
>... from
>... notification.json_timeseries
>... where
>... row_key in (Œred', Œblue')
>... and created > '2016-03-12 00:30:00+'
>... and created <= '2016-03-12 04:30:00+'
>... allow filtering;
>
>  num_samples | total| max
> -+--+---
> 3242 |11277 |  17
>
>
> Here is  my pyspark select statement. Notice the Œcreated column encodes
> the timezone¹. I am running this on my local mac (in PST timezone) and
> connecting to my data center (which runs on UTC) over a VPN.
>
> rawDF = sqlContext.read\
> .format("org.apache.spark.sql.cassandra")\
> .options(table="json_timeseries", keyspace="notification")\
> .load()
>
>
> rawDF.registerTempTable(tmpTableName)
>
>
>
> stmnt = "select \
> row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
> to_utc_timestamp(created, 'gmt') as gmt \
> from \
> rawTable \
> where \
> (created > '{0}') and (created <= '{1}') \
> and \
> (row_key = Œred' or row_key = Œblue¹) \
> )".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')
>
> rawDF = sqlCtx.sql(stmnt).cache()

What's the type of `created`? TimestampType?

If yes, when created is compared to a string, it will be casted into
string, then compared as string, it become

cast(created, as string) > '2016-03-12 00:30:00+'

Could you try this

sqlCtx.sql("select created, cast(created as string) from rawTable").show()



>
>
>
> I get a different values for row count, max, Š
>
> If I convert the UTC time stamp string to my local timezone the row count
> matches the count returned by  cqlsh
>
> # pst works, matches cassandra cqlsh
> # .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')
>
> Am I doing something wrong in my pyspark code?
>
>
> Kind regards
>
> Andy
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sql timestamp timezone bug

2016-03-19 Thread Davies Liu
In Spark SQL, timestamp is the number of micro seconds since epoch, so
it has nothing with timezone.

When you compare it again unix_timestamp or string, it's better to
convert these into timestamp then compare them.

In your case, the where clause should be:

where (created > cast('{0}' as timestamp)) and (created <=  cast('{1}'
as timestamp))

Could you try this?

On Fri, Mar 18, 2016 at 11:10 AM, Andy Davidson
 wrote:
> Hi Davies
>
>
>
> What's the type of `created`? TimestampType?
>
>
>
> The ‘created’ column in cassandra is a timestamp
> https://docs.datastax.com/en/cql/3.0/cql/cql_reference/timestamp_type_r.html
>
> In the spark data frame it is a a timestamp
>
>
> If yes, when created is compared to a string, it will be casted into
> string, then compared as string, it become
>
> cast(created, as string) > '2016-03-12 00:30:00+'
>
> Could you try this
>
> sqlCtx.sql("select created, cast(created as string) from rawTable").show()
>
>
>
> I am note sure I under stand your suggestion. In my where clause the date
> range is specified using string literals. I need the value of created to be
> a time stamps
>
> # http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html
> stmnt = "select \
> row_key, created,  cast(created as string), count,
> unix_timestamp(created) as unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.zz') as aedwip, \
> to_utc_timestamp(created, 'gmt') as gmt \
>  from \
> rawTable \
>  where \
>  (created > '{0}') and (created <= '{1}') \
>  and \
>  (row_key = ‘red' \
> or row_key = ‘blue' )".format('2016-03-12
> 00:30:00+', '2016-03-12 04:30:00+')
>
> rawDF = sqlContext.read\
> .format("org.apache.spark.sql.cassandra")\
> .options(table="json_timeseries", keyspace="notification")\
> .load()
> rawDF.registerTempTable(tmpTableName)
> rawDF = sqlCtx.sql(stmnt).cache()
>
>
> The time stamps are still not UTC they are in PST
>
> root
>  |-- row_key: string (nullable = true)
>  |-- created: timestamp (nullable = true)
>  |-- created: string (nullable = true)
>  |-- count: long (nullable = true)
>  |-- unixTimeStamp: long (nullable = true)
>  |-- aedwip: long (nullable = true)
>  |-- gmt: timestamp (nullable = true)
>
> +-+-+---+-+-+--+-+
> |row_key  |created  |created
> |count|unixTimeStamp|aedwip|gmt  |
> +-+-+---+-+-+--+-+
> |blue |2016-03-12 00:30:30.0|2016-03-12 00:30:30|2|1457771430
> |1457771430|2016-03-12 00:30:30.0|
> |blue |2016-03-12 00:30:45.0|2016-03-12 00:30:45|1|1457771445
> |1457771445|2016-03-12 00:30:45.0|
> |blue |2016-03-12 00:31:00.0|2016-03-12 00:31:00|1|1457771460
> |1457771460|2016-03-12 00:31:00.0|
> |
>
>
> Kind regards
>
> Andy

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parition RDD by key to create DataFrames

2016-03-15 Thread Davies Liu
I think you could create a DataFrame with schema (mykey, value1,
value2), then partition it by mykey when saving as parquet.

r2 = rdd.map((k, v) => Row(k, v._1, v._2))
df  = sqlContext.createDataFrame(r2, schema)
df.write.partitionBy("myKey").parquet(path)


On Tue, Mar 15, 2016 at 10:33 AM, Mohamed Nadjib MAMI
 wrote:
> Hi,
>
> I have a pair RDD of the form: (mykey, (value1, value2))
>
> How can I create a DataFrame having the schema [V1 String, V2 String] to
> store [value1, value2] and save it into a Parquet table named "mykey"?
>
> createDataFrame() method takes an RDD and a schema (StructType) in
> parameters. The schema is known up front ([V1 String, V2 String]), but
> getting an RDD by partitioning the original RDD based on the key is what I
> can't get my head around so far.
>
> Similar questions have been around (like
> http://stackoverflow.com/questions/25046199/apache-spark-splitting-pair-rdd-into-multiple-rdds-by-key-to-save-values)
> but they do not use DataFrames.
>
> Thanks in advance!
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python unit tests - Unable to ru it with Python 2.6 or 2.7

2016-03-11 Thread Davies Liu
Spark 2.0 is dropping the support for Python 2.6, it only work with
Python 2.7, and 3.4+

On Thu, Mar 10, 2016 at 11:17 PM, Gayathri Murali
 wrote:
> Hi all,
>
> I am trying to run python unit tests.
>
> I currently have Python 2.6 and 2.7 installed. I installed unittest2 against 
> both of them.
>
> When I try to run /python/run-tests with Python 2.7 I get the following error 
> :
>
> Please install unittest2 to test with Python 2.6 or earlier
> Had test failures in pyspark.sql.tests with python2.6; see logs.
>
> When I try to run /python/run-tests with Python 2.6 I get the following error:
>
> Traceback (most recent call last):
>   File "./python/run-tests.py", line 42, in 
> from sparktestsupport.modules import all_modules  # noqa
>   File "/Users/gayathri/spark/python/../dev/sparktestsupport/modules.py", 
> line 18, in 
> from functools import total_ordering
> ImportError: cannot import name total_ordering
>
> total_ordering is a package that is available in 2.7.
>
> Can someone help?
>
> Thanks
> Gayathri
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Job Hanging on Join

2016-02-22 Thread Davies Liu
This link may help:
https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html

Spark 1.6 had improved the CatesianProduct, you should turn of auto
broadcast and go with CatesianProduct in 1.6

On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali  wrote:
> Hello everyone,
>
> I'm working with Tamara and I wanted to give you guys an update on the
> issue:
>
> 1. Here is the output of .explain():
>>
>> Project
>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>> new_gender#42,fk_created_at_date#32 AS
>> new_fk_created_at_date#43,age_range#30 AS new_age_range#44,first_name#27 AS
>> new_first_name#45,last_name#28 AS new_last_name#46]
>>  BroadcastNestedLoopJoin BuildLeft, LeftOuter, Somecustomer_id#1L =
>> customer_id#25L) || (isnull(customer_id#1L) && isnull(customer_id#25L))) &&
>> ((country#2 = country#24) || (isnull(country#2) && isnull(country#24)
>>   Scan
>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>>   Scan
>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>
>
> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a difference.
> It still hangs indefinitely.
> 3. We are using Spark 1.5.2
> 4. We tried running this with 4 executors, 9 executors, and even in local
> mode with master set to "local[4]". The issue still persists in all cases.
> 5. Even without trying to cache any of the dataframes this issue still
> happens,.
> 6. We have about 200 partitions.
>
> Any help would be appreciated!
>
> Best Regards,
> Mo
>
> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta 
> wrote:
>>
>> Sorry,
>>
>> please include the following questions to the list above:
>>
>> the SPARK version?
>> whether you are using RDD or DataFrames?
>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>>  wrote:
>>>
>>> Hi Tamara,
>>>
>>> few basic questions first.
>>>
>>> How many executors are you using?
>>> Is the data getting all cached into the same executor?
>>> How many partitions do you have of the data?
>>> How many fields are you trying to use in the join?
>>>
>>> If you need any help in finding answer to these questions please let me
>>> know. From what I reckon joins like yours should not take more than a few
>>> milliseconds.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt  wrote:

 Hi all,

 I am running a Spark job that gets stuck attempting to join two
 dataframes. The dataframes are not very large, one is about 2 M rows, and
 the other a couple of thousand rows and the resulting joined dataframe
 should be about the same size as the smaller dataframe. I have tried
 triggering execution of the join using the 'first' operator, which as far 
 as
 I understand would not require processing the entire resulting dataframe
 (maybe I am mistaken though). The Spark UI is not telling me anything, just
 showing the task to be stuck.

 When I run the exact same job on a slightly smaller dataset it works
 without hanging.

 I have used the same environment to run joins on much larger dataframes,
 so I am confused as to why in this particular case my Spark job is just
 hanging. I have also tried running the same join operation using pyspark on
 two 2 Million row dataframes (exactly like the one I am trying to join in
 the job that gets stuck) and it runs succesfully.

 I have tried caching the joined dataframe to see how much memory it is
 requiring but the job gets stuck on this action too. I have also tried 
 using
 persist to memory and disk on the join, and the job seems to be stuck all
 the same.

 Any help as to where to look for the source of the problem would be much
 appreciated.

 Cheers,

 Tamara

>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark - how to use UDFs with dataframe groupby

2016-02-10 Thread Davies Liu
short answer: PySpark does not support UDAF (user defined aggregate
function) for now.

On Tue, Feb 9, 2016 at 11:44 PM, Viktor ARDELEAN 
wrote:

> Hello,
>
> I am using following transformations on RDD:
>
> rddAgg = df.map(lambda l: (Row(a = l.a, b= l.b, c = l.c), l))\
>.aggregateByKey([], lambda accumulatorList, value: accumulatorList 
> + [value], lambda list1, list2: [list1] + [list2])
>
> I want to use the dataframe groupBy + agg transformation instead of map + 
> aggregateByKey because as far as I know dataframe transformations are faster 
> than RDD transformations.
>
> I just can't figure out how to use custom aggregate functions with agg.
>
> *First step is clear:*
>
> groupedData = df.groupBy("a","b","c")
>
> *Second step is not very clear to me:*
>
> dfAgg = groupedData.agg( a list and merges it?>)
>
> The agg documentations says the following:
> agg(**exprs*)
> 
>
> Compute aggregates and returns the result as a DataFrame
> 
> .
>
> The available aggregate functions are avg, max, min, sum, count.
>
> If exprs is a single dict mapping from string to string, then the key is
> the column to perform aggregation on, and the value is the aggregate
> function.
>
> Alternatively, exprs can also be a list of aggregate Column
> 
>  expressions.
> Parameters: *exprs* – a dict mapping from column name (string) to
> aggregate functions (string), or a list of Column
> 
> .
>
> Thanks for help!
> --
> Viktor
>
> *P*   Don't print this email, unless it's really necessary. Take care of
> the environment.
>


Re: [discuss] dropping Python 2.6 support

2016-01-05 Thread Davies Liu
+1

On Tue, Jan 5, 2016 at 5:45 AM, Nicholas Chammas
 wrote:
> +1
>
> Red Hat supports Python 2.6 on REHL 5 until 2020, but otherwise yes, Python
> 2.6 is ancient history and the core Python developers stopped supporting it
> in 2013. REHL 5 is not a good enough reason to continue support for Python
> 2.6 IMO.
>
> We should aim to support Python 2.7 and Python 3.3+ (which I believe we
> currently do).
>
> Nick
>
> On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang  wrote:
>>
>> plus 1,
>>
>> we are currently using python 2.7.2 in production environment.
>>
>>
>>
>>
>>
>> 在 2016-01-05 18:11:45,"Meethu Mathew"  写道:
>>
>> +1
>> We use Python 2.7
>>
>> Regards,
>>
>> Meethu Mathew
>>
>> On Tue, Jan 5, 2016 at 12:47 PM, Reynold Xin  wrote:
>>>
>>> Does anybody here care about us dropping support for Python 2.6 in Spark
>>> 2.0?
>>>
>>> Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json
>>> parsing) when compared with Python 2.7. Some libraries that Spark depend on
>>> stopped supporting 2.6. We can still convince the library maintainers to
>>> support 2.6, but it will be extra work. I'm curious if anybody still uses
>>> Python 2.6 to run Spark.
>>>
>>> Thanks.
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [discuss] dropping Python 2.6 support

2016-01-05 Thread Davies Liu
Created JIRA: https://issues.apache.org/jira/browse/SPARK-12661

On Tue, Jan 5, 2016 at 2:49 PM, Koert Kuipers  wrote:
> i do not think so.
>
> does the python 2.7 need to be installed on all slaves? if so, we do not
> have direct access to those.
>
> also, spark is easy for us to ship with our software since its apache 2
> licensed, and it only needs to be present on the machine that launches the
> app (thanks to yarn).
> even if python 2.7 was needed only on this one machine that launches the app
> we can not ship it with our software because its gpl licensed, so the client
> would have to download it and install it themselves, and this would mean its
> an independent install which has to be audited and approved and now you are
> in for a lot of fun. basically it will never happen.
>
>
> On Tue, Jan 5, 2016 at 5:35 PM, Josh Rosen  wrote:
>>
>> If users are able to install Spark 2.0 on their RHEL clusters, then I
>> imagine that they're also capable of installing a standalone Python
>> alongside that Spark version (without changing Python systemwide). For
>> instance, Anaconda/Miniconda make it really easy to install Python 2.7.x/3.x
>> without impacting / changing the system Python and doesn't require any
>> special permissions to install (you don't need root / sudo access). Does
>> this address the Python versioning concerns for RHEL users?
>>
>> On Tue, Jan 5, 2016 at 2:33 PM, Koert Kuipers  wrote:
>>>
>>> yeah, the practical concern is that we have no control over java or
>>> python version on large company clusters. our current reality for the vast
>>> majority of them is java 7 and python 2.6, no matter how outdated that is.
>>>
>>> i dont like it either, but i cannot change it.
>>>
>>> we currently don't use pyspark so i have no stake in this, but if we did
>>> i can assure you we would not upgrade to spark 2.x if python 2.6 was
>>> dropped. no point in developing something that doesnt run for majority of
>>> customers.
>>>
>>> On Tue, Jan 5, 2016 at 5:19 PM, Nicholas Chammas
>>>  wrote:

 As I pointed out in my earlier email, RHEL will support Python 2.6 until
 2020. So I'm assuming these large companies will have the option of riding
 out Python 2.6 until then.

 Are we seriously saying that Spark should likewise support Python 2.6
 for the next several years? Even though the core Python devs stopped
 supporting it in 2013?

 If that's not what we're suggesting, then when, roughly, can we drop
 support? What are the criteria?

 I understand the practical concern here. If companies are stuck using
 2.6, it doesn't matter to them that it is deprecated. But balancing that
 concern against the maintenance burden on this project, I would say that
 "upgrade to Python 2.7 or stay on Spark 1.6.x" is a reasonable position to
 take. There are many tiny annoyances one has to put up with to support 2.6.

 I suppose if our main PySpark contributors are fine putting up with
 those annoyances, then maybe we don't need to drop support just yet...

 Nick
 2016년 1월 5일 (화) 오후 2:27, Julio Antonio Soto de Vicente
 님이 작성:
>
> Unfortunately, Koert is right.
>
> I've been in a couple of projects using Spark (banking industry) where
> CentOS + Python 2.6 is the toolbox available.
>
> That said, I believe it should not be a concern for Spark. Python 2.6
> is old and busted, which is totally opposite to the Spark philosophy IMO.
>
>
> El 5 ene 2016, a las 20:07, Koert Kuipers  escribió:
>
> rhel/centos 6 ships with python 2.6, doesnt it?
>
> if so, i still know plenty of large companies where python 2.6 is the
> only option. asking them for python 2.7 is not going to work
>
> so i think its a bad idea
>
> On Tue, Jan 5, 2016 at 1:52 PM, Juliet Hougland
>  wrote:
>>
>> I don't see a reason Spark 2.0 would need to support Python 2.6. At
>> this point, Python 3 should be the default that is encouraged.
>> Most organizations acknowledge the 2.7 is common, but lagging behind
>> the version they should theoretically use. Dropping python 2.6
>> support sounds very reasonable to me.
>>
>> On Tue, Jan 5, 2016 at 5:45 AM, Nicholas Chammas
>>  wrote:
>>>
>>> +1
>>>
>>> Red Hat supports Python 2.6 on REHL 5 until 2020, but otherwise yes,
>>> Python 2.6 is ancient history and the core Python developers stopped
>>> supporting it in 2013. REHL 5 is not a good enough reason to continue
>>> support for Python 2.6 IMO.
>>>
>>> We should aim to support Python 2.7 and Python 3.3+ (which I believe
>>> we currently do).
>>>
>>> Nick
>>>
>>> On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang 

Re: Problem with WINDOW functions?

2015-12-30 Thread Davies Liu
Window functions are improved in 1.6 release, could you try 1.6-RC4
(or wait until next week for the final release)?

Even In 1.6, the buffer of rows for window function does not support
spilling (also does not use memory efficiently), there is a JIRA for
it: https://issues.apache.org/jira/browse/SPARK-12295

On Tue, Dec 29, 2015 at 5:28 PM, vadimtk  wrote:
> Hi,
>
> I can't successfully execute a query with WINDOW function.
>
> The statements are following:
>
> val orcFile =
> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
> orcFile.registerTempTable("d1")
>  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day ORDER
> BY pageviews DESC) as rank FROM d1").filter("rank <=
> 20").sort($"day",$"rank").collect().foreach(println)
>
> with default
> spark.driver.memory
>
> I am getting java.lang.OutOfMemoryError: Java heap space.
> The same if I set spark.driver.memory=10g.
>
> When I set spark.driver.memory=45g (the box has 256GB of RAM) the execution
> fails with a different error:
>
> 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no recent
> heartbeats: 129324 ms exceeds timeout 12 ms
>
> And I see that GC takes a lot of time.
>
> What is a proper way to execute statements above?
>
> I see the similar problems reported
> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
>
>
>
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



回复: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-29 Thread Davies Liu
Hi Andy,  

Could you change logging level to INFO and post some here? There will be some 
logging about the memory usage of a task when OOM.  

In 1.6, the memory for a task is : (HeapSize  - 300M) * 0.75 / number of tasks. 
Is it possible that the heap is too small?

Davies  

--  
Davies Liu
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)

已使用 Sparrow (http://www.sparrowmailapp.com/?sig)  

在 2015年12月29日 星期二,下午4:28,Andy Davidson 写道:

> Hi Michael
>  
> https://github.com/apache/spark/archive/v1.6.0.tar.gz
>  
> Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before 
> saving output. Coalesce still fails.  
>  
> Coalesce(1) spark-1.5.2
> Caused by:
> java.io.IOException: Unable to acquire 33554432 bytes of memory
>  
>  
> Coalesce(1) spark-1.6.0
>  
> Caused by:  
> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
>  
> Hope this helps
>  
> Andy
>  
> From: Michael Armbrust <mich...@databricks.com 
> (mailto:mich...@databricks.com)>
> Date: Monday, December 28, 2015 at 2:41 PM
> To: Andrew Davidson <a...@santacruzintegration.com 
> (mailto:a...@santacruzintegration.com)>
> Cc: "user @spark" <user@spark.apache.org (mailto:user@spark.apache.org)>
> Subject: Re: trouble understanding data frame memory usage 
> ³java.io.IOException: Unable to acquire memory²
>  
> > Unfortunately in 1.5 we didn't force operators to spill when ran out of 
> > memory so there is not a lot you can do. It would be awesome if you could 
> > test with 1.6 and see if things are any better?
> >  
> > On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson 
> > <a...@santacruzintegration.com (mailto:a...@santacruzintegration.com)> 
> > wrote:
> > > I am using spark 1.5.1. I am running into some memory problems with a 
> > > java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how 
> > > ever I want to better understand what is going on so I can write better 
> > > code in the future. The test runs on a Mac, master="Local[2]"
> > >  
> > > I have a java unit test that starts by reading a 672K ascii file. I my 
> > > output data file is 152K. Its seems strange that such a small amount of 
> > > data would cause an out of memory exception. I am running a pretty 
> > > standard machine learning process
> > >  
> > > Load data
> > > create a ML pipeline
> > > transform the data
> > > Train a model
> > > Make predictions
> > > Join the predictions back to my original data set
> > > Coalesce(1), I only have a small amount of data and want to save it in a 
> > > single file
> > > Save final results back to disk
> > >  
> > >  
> > > Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to 
> > > acquire memory”
> > >  
> > > To try and figure out what is going I put log messages in to count the 
> > > number of partitions
> > >  
> > > Turns out I have 20 input files, each one winds up in a separate 
> > > partition. Okay so after loading I call coalesce(1) and check to make 
> > > sure I only have a single partition.
> > >  
> > > The total number of observations is 1998.
> > >  
> > > After calling step 7 I count the number of partitions and discovered I 
> > > have 224 partitions!. Surprising given I called Coalesce(1) before I did 
> > > anything with the data. My data set should easily fit in memory. When I 
> > > save them to disk I get 202 files created with 162 of them being empty!
> > >  
> > > In general I am not explicitly using cache.
> > >  
> > > Some of the data frames get registered as tables. I find it easier to use 
> > > sql.
> > >  
> > > Some of the data frames get converted back to RDDs. I find it easier to 
> > > create RDD this way
> > >  
> > > I put calls to unpersist(true). In several places
> > >  
> > >  
> > > private void memoryCheck(String name) {
> > >  
> > >  
> > > Runtime rt = Runtime.getRuntime();
> > >  
> > >  
> > > logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size: {}",  
> > >  
> > >  
> > > name,  
> > >  
> > >  
> > > String.format("%,d", rt.totalMemory()),  
> > >  
> > >  
> > > String.format("%,d", rt.freeMemory()));
> > >  
> > >  
> > > }
> > >  
> > >  
> > &g

Re: Does Spark SQL support rollup like HQL

2015-12-29 Thread Davies Liu
Just sent out a PR[1] to support cube/rollup as function, it works
with both SQLContext and HiveContext.

https://github.com/apache/spark/pull/10522/files

On Tue, Dec 29, 2015 at 9:35 PM, Yi Zhang  wrote:
> Hi Hao,
>
> Thanks. I'll take a look at it.
>
>
> On Wednesday, December 30, 2015 12:47 PM, "Cheng, Hao" 
> wrote:
>
>
> Hi, currently, the Simple SQL Parser of SQLContext is quite weak, and
> doesn’t support the rollup, but you can check the code
>
> https://github.com/apache/spark/pull/5080/ , which aimed to add the support,
> just in case you can patch it in your own branch.
>
> In Spark 2.0, the simple SQL Parser will be replaced by HQL Parser, so it
> will not be the problem then.
>
> Hao
>
> From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID]
> Sent: Wednesday, December 30, 2015 11:41 AM
> To: User
> Subject: Does Spark SQL support rollup like HQL
>
> Hi guys,
>
> As we know, hqlContext support rollup like this:
>
> hiveContext.sql("select a, b, sum(c) from t group by a, b with rollup")
>
> And I also knows that dataframe provides rollup function to support it:
>
> dataframe.rollup($"a", $"b").agg(Map("c" -> "sum"))
>
> But in my scenario, I'd better use sql syntax in SqlContext to support
> rollup it seems like what HqlContext does. Any suggestion?
>
> Thanks.
>
> Regards,
> Yi Zhang
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Content based window operation on Time-series data

2015-12-17 Thread Davies Liu
Could you try this?

df.groupBy(cast(col("timeStamp") - start) / bucketLengthSec,
IntegerType)).agg(max("timestamp"), max("value")).collect()

On Wed, Dec 9, 2015 at 8:54 AM, Arun Verma  wrote:
> Hi all,
>
> We have RDD(main) of sorted time-series data. We want to split it into
> different RDDs according to window size and then perform some aggregation
> operation like max, min etc. over each RDD in parallel.
>
> If window size is w then ith RDD has data from (startTime + (i-1)*w) to
> (startTime + i*w) where startTime is time-stamp of 1st entry in main RDD and
> (startTime + (i-1)*w) is greater then last entry of main RDD.
>
> For now, I am using DataFrame and Spark version 1.5.2. Below code is running
> sequentially on the data, so execution time is high and resource utilization
> is low. Code snippet is given below:
> /*
> * aggragator is max
> * df - Dataframe has sorted timeseries data
> * start - first entry of DataFrame
> * end - last entry of DataFrame df
> * bucketLengthSec - window size
> * stepResults - has particular block/window output(JSON)
> * appendResults - has output till this block/window(JSON)
> */
> while (start <= end) {
> row = df.filter(df.col("timeStamp")
> .between(start, nextStart))
> .agg(max(df.col("timeStamp")), max(df.col("value")))
> .first();
> if (row.get(0) != null) {
> stepResults = new JSONObject();
> stepResults.put("x", Long.parseLong(row.get(0).toString()));
> stepResults.put("y", row.get(1));
> appendResults.add(stepResults);
> }
> start = nextStart;
> nextStart = start + bucketLengthSec;
> }
>
>
> --
> Thanks and Regards,
> Arun Verma

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL 1.3 not finding attribute in DF

2015-12-07 Thread Davies Liu
Could you reproduce this problem in 1.5 or 1.6?

On Sun, Dec 6, 2015 at 12:29 AM, YaoPau  wrote:
> If anyone runs into the same issue, I found a workaround:
>
 df.where('state_code = "NY"')
>
> works for me.
>
 df.where(df.state_code == "NY").collect()
>
> fails with the error from the first post.
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-3-not-finding-attribute-in-DF-tp25599p25600.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: UDF with 2 arguments

2015-11-25 Thread Davies Liu
It works in master (1.6), what's the version of Spark you have?

>>> from pyspark.sql.functions import udf
>>> def f(a, b): pass
...
>>> my_udf = udf(f)
>>> from pyspark.sql.types import *
>>> my_udf = udf(f, IntegerType())


On Wed, Nov 25, 2015 at 12:01 PM, Daniel Lopes  wrote:
> Hallo,
>
> supose I have function in pyspark that
>
> def function(arg1,arg2):
>   pass
>
> and
>
> udf_function = udf(function, IntegerType())
>
> that takes me error
>
> Traceback (most recent call last):
>   File "", line 1, in 
> TypeError: __init__() takes at least 2 arguments (1 given)
>
>
> How I use?
>
> Best,
>
>
> --
> Daniel Lopes, B.Eng
> Data Scientist - BankFacil
> CREA/SP 5069410560
> Mob +55 (18) 99764-2733
> Ph +55 (11) 3522-8009
> http://about.me/dannyeuu
>
> Av. Nova Independência, 956, São Paulo, SP
> Bairro Brooklin Paulista
> CEP 04570-001
> https://www.bankfacil.com.br
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Save CSV with JSON Column

2015-11-24 Thread Davies Liu
I think you could have a Python UDF to turn the properties into JSON string:

import simplejson
def to_json(row):
 return simplejson.dumps(row.asDict(recursive=Trye))

to_json_udf = pyspark.sql.funcitons.udf(to_json)

df.select("col_1", "col_2",
to_json_udf(df.properties)).write.format("com.databricks.spark.csv").save()


On Tue, Nov 24, 2015 at 7:36 AM,   wrote:
> I am generating a set of tables in pyspark SQL from a JSON source dataset. I 
> am writing those tables to disk as CSVs using 
> df.write.format(com.databricks.spark.csv).save(…). I have a schema like:
>
> root
>  |-- col_1: string (nullable = true)
>  |-- col_2: string (nullable = true)
>  |-- col_3: timestamp (nullable = true)
> ...
>  |-- properties: struct (nullable = true)
>  ||-- prop_1: string (nullable = true)
>  ||-- prop_2: string (nullable = true)
>  ||-- prop3: string (nullable = true)
> …
>
> Currently I am dropping the properties section when I write to CSV, but I 
> would like to write it as a JSON column. How can I go about this? My final 
> result would be a CSV with col_1, col_2, col_3 as usual but the ‘properties’ 
> column would contain formatted JSON objects.
>
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to us DataFrame.na.fill based on condition

2015-11-23 Thread Davies Liu
DataFrame.replace(to_replace, value, subset=None)

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

On Mon, Nov 23, 2015 at 11:05 AM, Vishnu Viswanath
 wrote:
> Hi
>
> Can someone tell me if there is a way I can use the fill method in
> DataFrameNaFunctions based on some condition.
>
> e.g., df.na.fill("value1","column1","condition1")
> df.na.fill("value2","column1","condition2")
>
> i want to fill nulls in column1 with values - either value 1 or value 2,
> based on some condition.
>
> Thanks,

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to us DataFrame.na.fill based on condition

2015-11-23 Thread Davies Liu
You could create a new column based on the expression: IF (condition1,
value1, old_column_value)

On Mon, Nov 23, 2015 at 11:57 AM, Vishnu Viswanath
<vishnu.viswanat...@gmail.com> wrote:
> Thanks for the reply Davies
>
> I think replace, replaces a value with another value. But what I want to do
> is fill in the null value of a column.( I don't have a to_replace here )
>
> Regards,
> Vishnu
>
> On Mon, Nov 23, 2015 at 1:37 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> DataFrame.replace(to_replace, value, subset=None)
>>
>>
>> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
>>
>> On Mon, Nov 23, 2015 at 11:05 AM, Vishnu Viswanath
>> <vishnu.viswanat...@gmail.com> wrote:
>> > Hi
>> >
>> > Can someone tell me if there is a way I can use the fill method in
>> > DataFrameNaFunctions based on some condition.
>> >
>> > e.g., df.na.fill("value1","column1","condition1")
>> > df.na.fill("value2","column1","condition2")
>> >
>> > i want to fill nulls in column1 with values - either value 1 or value 2,
>> > based on some condition.
>> >
>> > Thanks,
>
>
>
>
> --
> Thanks and Regards,
> Vishnu Viswanath
> +1 309 550 2311
> www.vishnuviswanath.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: very slow parquet file write

2015-11-13 Thread Davies Liu
Have you use any partitioned columns when write as json or parquet?

On Fri, Nov 6, 2015 at 6:53 AM, Rok Roskar  wrote:
> yes I was expecting that too because of all the metadata generation and
> compression. But I have not seen performance this bad for other parquet
> files I’ve written and was wondering if there could be something obvious
> (and wrong) to do with how I’ve specified the schema etc. It’s a very simple
> schema consisting of a StructType with a few StructField floats and a
> string. I’m using all the spark defaults for io compression.
>
> I'll see what I can do about running a profiler -- can you point me to a
> resource/example?
>
> Thanks,
>
> Rok
>
> ps: my post on the mailing list is still listed as not accepted by the
> mailing list:
> http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-td25295.html
> -- none of your responses are there either. I am definitely subscribed to
> the list though (I get daily digests). Any clue how to fix it?
>
>
>
>
> On Nov 6, 2015, at 9:26 AM, Cheng Lian  wrote:
>
> I'd expect writing Parquet files slower than writing JSON files since
> Parquet involves more complicated encoders, but maybe not that slow. Would
> you mind to try to profile one Spark executor using tools like YJP to see
> what's the hotspot?
>
> Cheng
>
> On 11/6/15 7:34 AM, rok wrote:
>
> Apologies if this appears a second time!
>
> I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
> parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
> the size of file this is way over-provisioned (I've tried it with fewer
> partitions and fewer nodes, no obvious effect). I was expecting the dump to
> disk to be very fast -- the DataFrame is cached in memory and contains just
> 14 columns (13 are floats and one is a string). When I write it out in json
> format, this is indeed reasonably fast (though it still takes a few minutes,
> which is longer than I would expect).
>
> However, when I try to write a parquet file it takes way longer -- the first
> set of tasks finishes in a few minutes, but the subsequent tasks take more
> than twice as long or longer. In the end it takes over half an hour to write
> the file. I've looked at the disk I/O and cpu usage on the compute nodes and
> it looks like the processors are fully loaded while the disk I/O is
> essentially zero for long periods of time. I don't see any obvious garbage
> collection issues and there are no problems with memory.
>
> Any ideas on how to debug/fix this?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Distributing Python code packaged as tar balls

2015-11-13 Thread Davies Liu
Python does not support library as tar balls, so PySpark may also not
support that.

On Wed, Nov 4, 2015 at 5:40 AM, Praveen Chundi  wrote:
> Hi,
>
> Pyspark/spark-submit offers a --py-files handle to distribute python code
> for execution. Currently(version 1.5) only zip files seem to be supported, I
> have tried distributing tar balls unsuccessfully.
>
> Is it worth adding support for tar balls?
>
> Best regards,
> Praveen Chundi
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: bin/pyspark SparkContext is missing?

2015-11-13 Thread Davies Liu
You forgot to create a SparkContext instance:

sc = SparkContext()

On Tue, Nov 3, 2015 at 9:59 AM, Andy Davidson
 wrote:
> I am having a heck of a time getting Ipython notebooks to work on my 1.5.1
> AWS cluster I created using spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2
>
> I have read the instructions for using iPython notebook on
> http://spark.apache.org/docs/latest/programming-guide.html#using-the-shell
>
> I want to run the notebook server on my master and use an ssh tunnel to
> connect a web browser running on my mac.
>
> I am confident the cluster is set up correctly because the sparkPi example
> runs.
>
> I am able to use IPython notebooks on my local mac and work with spark and
> local files with out any problems.
>
> I know the ssh tunnel is working.
>
> On my cluster I am able to use python shell in general
>
> [ec2-user@ip-172-31-29-60 dataScience]$ /root/spark/bin/pyspark --master
> local[2]
>
>
 from pyspark import SparkContext
>
 textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.txt")
>
 textFile.take(1)
>
>
>
> When I run the exact same code in iPython notebook I get
>
> ---
> NameError Traceback (most recent call last)
>  in ()
>  11 from pyspark import SparkContext, SparkConf
>  12
> ---> 13 textFile =
> sc.textFile("file:///home/ec2-user/dataScience/readme.txt")
>  14
>  15 textFile.take(1)
>
> NameError: name 'sc' is not defined
>
>
>
>
> To try an debug I wrote a script to launch pyspark and added ‘set –x’ to
> pyspark so I could see what the script was doing
>
> Any idea how I can debug this?
>
> Thanks in advance
>
> Andy
>
> $ cat notebook.sh
>
> set -x
>
> export PYSPARK_DRIVER_PYTHON=ipython
>
> export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=7000"
>
> /root/spark/bin/pyspark --master local[2]
>
>
>
>
> [ec2-user@ip-172-31-29-60 dataScience]$ ./notebook.sh
>
> ++ export PYSPARK_DRIVER_PYTHON=ipython
>
> ++ PYSPARK_DRIVER_PYTHON=ipython
>
> ++ export 'PYSPARK_DRIVER_PYTHON_OPTS=notebook --no-browser --port=7000'
>
> ++ PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=7000'
>
> ++ /root/spark/bin/pyspark --master 'local[2]'
>
> +++ dirname /root/spark/bin/pyspark
>
> ++ cd /root/spark/bin/..
>
> ++ pwd
>
> + export SPARK_HOME=/root/spark
>
> + SPARK_HOME=/root/spark
>
> + source /root/spark/bin/load-spark-env.sh
>
>  dirname /root/spark/bin/pyspark
>
> +++ cd /root/spark/bin/..
>
> +++ pwd
>
> ++ FWDIR=/root/spark
>
> ++ '[' -z '' ']'
>
> ++ export SPARK_ENV_LOADED=1
>
> ++ SPARK_ENV_LOADED=1
>
>  dirname /root/spark/bin/pyspark
>
> +++ cd /root/spark/bin/..
>
> +++ pwd
>
> ++ parent_dir=/root/spark
>
> ++ user_conf_dir=/root/spark/conf
>
> ++ '[' -f /root/spark/conf/spark-env.sh ']'
>
> ++ set -a
>
> ++ . /root/spark/conf/spark-env.sh
>
> +++ export JAVA_HOME=/usr/java/latest
>
> +++ JAVA_HOME=/usr/java/latest
>
> +++ export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
>
> +++ SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
>
> +++ export SPARK_MASTER_OPTS=
>
> +++ SPARK_MASTER_OPTS=
>
> +++ '[' -n 1 ']'
>
> +++ export SPARK_WORKER_INSTANCES=1
>
> +++ SPARK_WORKER_INSTANCES=1
>
> +++ export SPARK_WORKER_CORES=2
>
> +++ SPARK_WORKER_CORES=2
>
> +++ export HADOOP_HOME=/root/ephemeral-hdfs
>
> +++ HADOOP_HOME=/root/ephemeral-hdfs
>
> +++ export
> SPARK_MASTER_IP=ec2-54-215-207-132.us-west-1.compute.amazonaws.com
>
> +++ SPARK_MASTER_IP=ec2-54-215-207-132.us-west-1.compute.amazonaws.com
>
>  cat /root/spark-ec2/cluster-url
>
> +++ export
> MASTER=spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077
>
> +++ MASTER=spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077
>
> +++ export SPARK_SUBMIT_LIBRARY_PATH=:/root/ephemeral-hdfs/lib/native/
>
> +++ SPARK_SUBMIT_LIBRARY_PATH=:/root/ephemeral-hdfs/lib/native/
>
> +++ export SPARK_SUBMIT_CLASSPATH=::/root/ephemeral-hdfs/conf
>
> +++ SPARK_SUBMIT_CLASSPATH=::/root/ephemeral-hdfs/conf
>
>  wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname
>
> +++ export
> SPARK_PUBLIC_DNS=ec2-54-215-207-132.us-west-1.compute.amazonaws.com
>
> +++ SPARK_PUBLIC_DNS=ec2-54-215-207-132.us-west-1.compute.amazonaws.com
>
> +++ export YARN_CONF_DIR=/root/ephemeral-hdfs/conf
>
> +++ YARN_CONF_DIR=/root/ephemeral-hdfs/conf
>
>  id -u
>
> +++ '[' 222 == 0 ']'
>
> ++ set +a
>
> ++ '[' -z '' ']'
>
> ++ ASSEMBLY_DIR2=/root/spark/assembly/target/scala-2.11
>
> ++ ASSEMBLY_DIR1=/root/spark/assembly/target/scala-2.10
>
> ++ [[ -d /root/spark/assembly/target/scala-2.11 ]]
>
> ++ '[' -d /root/spark/assembly/target/scala-2.11 ']'
>
> ++ export SPARK_SCALA_VERSION=2.10
>
> ++ SPARK_SCALA_VERSION=2.10
>
> + export '_SPARK_CMD_USAGE=Usage: ./bin/pyspark [options]'
>
> + _SPARK_CMD_USAGE='Usage: ./bin/pyspark [options]'
>
> + hash python2.7
>
> + DEFAULT_PYTHON=python2.7
>
> + [[ -n '' ]]
>
> + [[ '' == \1 ]]
>
> + [[ -z ipython ]]
>
> + 

Re: very slow parquet file write

2015-11-13 Thread Davies Liu
Do you have partitioned columns?

On Thu, Nov 5, 2015 at 2:08 AM, Rok Roskar  wrote:
> I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
> parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
> the size of file this is way over-provisioned (I've tried it with fewer
> partitions and fewer nodes, no obvious effect). I was expecting the dump to
> disk to be very fast -- the DataFrame is cached in memory and contains just
> 14 columns (13 are floats and one is a string). When I write it out in json
> format, this is indeed reasonably fast (though it still takes a few minutes,
> which is longer than I would expect).
>
> However, when I try to write a parquet file it takes way longer -- the first
> set of tasks finishes in a few minutes, but the subsequent tasks take more
> than twice as long or longer. In the end it takes over half an hour to write
> the file. I've looked at the disk I/O and cpu usage on the compute nodes and
> it looks like the processors are fully loaded while the disk I/O is
> essentially zero for long periods of time. I don't see any obvious garbage
> collection issues and there are no problems with memory.
>
> Any ideas on how to debug/fix this?
>
> Thanks!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Exception: Conf non-local session path expected to be non-null

2015-10-20 Thread Davies Liu
The thread-local things does not work well with PySpark, because the
thread used by PySpark in JVM could change over time, SessionState
could be lost.

This should be fixed in master by https://github.com/apache/spark/pull/8909


On Mon, Oct 19, 2015 at 1:08 PM, YaoPau  wrote:
> I've connected Spark SQL to the Hive Metastore and currently I'm running SQL
> code via pyspark.  Typically everything works fine, but sometimes after a
> long-running Spark SQL job I get the error below, and from then on I can no
> longer run Spark SQL commands.  I still do have both my sc and my sqlCtx.
>
> Any idea what this could mean?
>
> An error occurred while calling o36.sql.
> : org.apache.spark.sql.AnalysisException: Conf non-local session path
> expected to be non-null;
> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
> at 
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at 
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
> at
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
> at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
> at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
> at
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
> at
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
> at 
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at 
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
> at
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
> at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:235)
> at
> 

Re: best way to generate per key auto increment numerals after sorting

2015-10-19 Thread Davies Liu
What's the issue with groupByKey()?

On Mon, Oct 19, 2015 at 1:11 AM, fahad shah  wrote:
> Hi
>
> I wanted to ask whats the best way to achieve per key auto increment
> numerals after sorting, for eg. :
>
> raw file:
>
> 1,a,b,c,1,1
> 1,a,b,d,0,0
> 1,a,b,e,1,0
> 2,a,e,c,0,0
> 2,a,f,d,1,0
>
> post-output (the last column is the position number after grouping on
> first three fields and reverse sorting on last two values)
>
> 1,a,b,c,1,1,1
> 1,a,b,d,0,0,3
> 1,a,b,e,1,0,2
> 2,a,e,c,0,0,2
> 2,a,f,d,1,0,1
>
> I am using solution that uses groupbykey but that is running into some
> issues (possibly bug with pyspark/spark?), wondering if there is a
> better way to achieve this.
>
> My solution:
>
> A = A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> B = A.map(lambda k:
> ((k.first_field,k.second_field,k.first_field,k.third_field),
> (k[0:5]))).groupByKey()
>
> B.map(sort_n_set_position).flatMap(lambda line: line)
>
> where sort and set position iterates over the iterator and performs
> sorting and adding last column.
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread Davies Liu
Could you simplify the code a little bit so we can reproduce the failure?
(may also have some sample dataset if it depends on them)

On Sun, Oct 18, 2015 at 10:42 PM, fahad shah  wrote:
>  Hi
>
> I am trying to do pair rdd's, group by the key assign id based on key.
> I am using Pyspark with spark 1.3, for some reason, I am getting this
> error that I am unable to figure out - any help much appreciated.
>
> Things I tried (but to no effect),
>
> 1. make sure I am not doing any conversions on the strings
> 2. make sure that the fields used in the key are all there  and not
> empty string (or else I toss the row out)
>
> My code is along following lines (split is using stringio to parse
> csv, header removes the header row and parse_train is putting the 54
> fields into named tuple after whitespace/quote removal):
>
> #Error for string argument is thrown on the BB.take(1) where the
> groupbykey is evaluated
>
> A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> A.count()
>
> B = A.map(lambda k:
> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>  k.srch_children_count,k.srch_room_count), (k[0:54])))
> BB = B.groupByKey()
> BB.take(1)
>
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: results differ based on whether persist() has been called

2015-10-19 Thread Davies Liu
This should be fixed by
https://github.com/apache/spark/commit/a367840834b97cd6a9ecda568bb21ee6dc35fcde

Will be released as 1.5.2 soon.

On Mon, Oct 19, 2015 at 9:04 AM, peay2  wrote:
> Hi,
>
> I am getting some very strange results, where I get different results based
> on whether or not I call persist() on a data frame or not before
> materialising it.
>
> There's probably something obvious I am missing, as only very simple
> operations are involved here. Any help with this would be greatly
> appreciated. I have a simple data-frame with IDs and values:
>
> data_dict = {'id': {k: str(k) for k in range(99)}, 'value':
> dict(enumerate(['A'] * 4 + ['B'] * 46 + ['C'] * 49))}
> df_small = pd.DataFrame(data_dict)
> records = sqlContext.createDataFrame(df_small)
> records.printSchema()
>
> # root
> # |-- id: string (nullable = true)
> # |-- value: string (nullable = true)
>
> Now, I left outer join over the IDs -- here, using a dummy constant column
> on the right instead of a separate data-frame (enough to reproduce my
> issue):
>
> unique_ids = records.select("id").dropDuplicates()
> id_names = unique_ids.select(F.col("id").alias("id_join"),
> F.lit("xxx").alias("id_name"))
>
> df_joined = records.join(id_names, records['id'] == id_names['id_join'],
> "left_outer").drop("id_join")
>
> At this point, *doing a show on df_joined* indicates all is fine: all
> records are there as expected, for instance:
>
> df_joined[(df_joined['id'] > 60) & (df_joined['id'] < 70)].show()
> +---+-+---+
> | id|value|id_name|
> +---+-+---+
> | 61|C|xxx|
> | 62|C|xxx|
> | 63|C|xxx|
> | 64|C|xxx|
> ...
>
> However, if I filter for a given value and then group by ID, I do not get
> back all of the groups:
>
> def print_unique_ids(df):
>filtered = df[df["value"] == "C"]
>plan = filtered.groupBy("id").count().select("id")
>unique_ids = list(plan.toPandas()["id"])
>
>print "{0} IDs: {1}\n".format(len(unique_ids), sorted(unique_ids))
>print plan.rdd.toDebugString() + "\n"
>
> print_unique_ids(df_joined.unpersist())
> print_unique_ids(df_joined.persist())
>
> 49 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58',
> u'59', u'60', u'61', u'62', u'63', u'64', u'65', u'66', u'67', u'68', u'69',
> u'70', u'71', u'72', u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80',
> u'81', u'82', u'83', u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91',
> u'92', u'93', u'94', u'95', u'96', u'97', u'98']
>
> 46 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58',
> u'59', u'60', u'61', u'62', u'66', u'67', u'68', u'69', u'70', u'71', u'72',
> u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', u'81', u'82', u'83',
> u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', u'92', u'93', u'94',
> u'95', u'96', u'97', u'98']
>
> Note how here IDs 43, 44, 45 are missing when persist() has been called. The
> output is correct if the data-frame has not been marked for persistance, but
> incorrect after the call to persist.
>
> When persist() has been called, Tungsten seems to be involved, but not if
> the data-frame has not been persisted. I am including the full outputs of
> toDebugString below.
>
> Has anyone any idea what is going on here?
>
> In case this helps: I see no issue if I don't do the dummy join, or if I
> don't filter for value == "C". I have a default spark config, besides
> "spark.shuffle.consolidateFiles=true", and spark 1.5.1.
>
> Thanks a lot!
>
> - Without persist:
>
> (200) MapPartitionsRDD[26] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsRDD[25] at javaToPython at
> NativeMethodAccessorImpl.java:-2 []
>   |   MapPartitionsWithPreparationRDD[22] at toPandas at
> :25 []
>   |   MapPartitionsWithPreparationRDD[21] at toPandas at
> :25 []
>   |   MapPartitionsRDD[20] at toPandas at :25 []
>   |   ZippedPartitionsRDD2[19] at toPandas at :25 []
>   |   MapPartitionsWithPreparationRDD[9] at toPandas at
> :25 []
>   |   ShuffledRowRDD[8] at toPandas at :25 []
>   +-(2) MapPartitionsRDD[7] at toPandas at :25 []
>  |  MapPartitionsRDD[6] at toPandas at :25 []
>  |  MapPartitionsRDD[5] at toPandas at :25 []
>  |  MapPartitionsRDD[4] at applySchemaToPythonRDD at
> NativeMethodAccessorImpl.java:-2 []
>  |  MapPartitionsRDD[3] at map at SerDeUtil.scala:100 []
>  |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 []
>  |  PythonRDD[1] at RDD at PythonRDD.scala:43 []
>  |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []
>   |   MapPartitionsWithPreparationRDD[18] at toPandas at
> :25 []
>   |   ShuffledRowRDD[17] at toPandas at :25 []
>   +-(200) MapPartitionsRDD[16] at toPandas at :25 []
>   |   MapPartitionsRDD[15] at toPandas at :25 []
>   |   MapPartitionsWithPreparationRDD[14] at toPandas at
> :25 []
>   |  

Re: Handling expirying state in UDF

2015-10-12 Thread Davies Liu
Could you try this?

my_token = None
def my_udf(a):
 global my_token
 if my_token is None:
   # create token
# do something

In this way, a new token will be created for each pyspark task

On Sun, Oct 11, 2015 at 5:14 PM, brightsparc  wrote:
> Hi,
>
> I have created a python UDF to make an API which requires an expirying OAuth
> token which requires refreshing every 600 seconds which is longer than any
> given stage.
>
> Due to the nature of threads and local state, if I use a global variable,
> the variable goes out of scope regularly.
>
> I look into using a broadcast variable, but this doesn't support the ability
> to expire/refresh the variable.  So I looked into using setLocalProperty and
> getLocalProperty on the spark context, but this can't be accessed within a
> UDF.
>
> Is there a recommended way to handle this scenario in PySpark?
>
> Thanks,
> Julian.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Handling-expirying-state-in-UDF-tp25021.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: weird issue with sqlContext.createDataFrame - pyspark 1.3.1

2015-10-09 Thread Davies Liu
Is it possible that you have an very old version of pandas, that does
not have DataFrame (or in different submodule).

Could you try this:
```
>>> import pandas
>>> pandas.__version__
'0.14.0'
```

On Thu, Oct 8, 2015 at 10:28 PM, ping yan  wrote:
> I really cannot figure out what this is about..
> (tried to import pandas, in case that is a dependency, but it didn't help.)
>
 from pyspark.sql import SQLContext
 sqlContext=SQLContext(sc)
 sqlContext.createDataFrame(l).collect()
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/python/pyspark/sql/context.py",
> line 318, in createDataFrame
> if has_pandas and isinstance(data, pandas.DataFrame):
> AttributeError: 'module' object has no attribute 'DataFrame'
>
> Would appreciate any pointers.
>
> Thanks!
> Ping
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading JSON in Pyspark throws scala.MatchError

2015-10-05 Thread Davies Liu
Could you create a JIRA to track this bug?

On Fri, Oct 2, 2015 at 1:42 PM, balajikvijayan
 wrote:
> Running Windows 8.1, Python 2.7.x, Scala 2.10.5, Spark 1.4.1.
>
> I'm trying to read in a large quantity of json data in a couple of files and
> I receive a scala.MatchError when I do so. Json, Python and stack trace all
> shown below.
>
> Json:
>
> {
> "dataunit": {
> "page_view": {
> "nonce": 438058072,
> "person": {
> "user_id": 5846
> },
> "page": {
> "url": "http://mysite.com/blog;
> }
> }
> },
> "pedigree": {
> "true_as_of_secs": 1438627992
> }
> }
>
> Python:
>
> import pyspark
> sc = pyspark.SparkContext()
> sqlContext = pyspark.SQLContext(sc)
> pageviews = sqlContext.read.json("[Path to folder containing file with above
> json]")
> pageviews.collect()
>
> Stack Trace:
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 32.0 (TID 133, localhost): scala.MatchError:
> (VALUE_STRING,ArrayType(StructType(),true)) (of class scala.Tuple2)
> at
> org.apache.spark.sql.json.JacksonParser$.convertField(JacksonParser.scala:49)
> at
> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:201)
> at
> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:193)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at

Re: StructType has more rows, than corresponding Row has objects.

2015-10-05 Thread Davies Liu
Could you tell us a way to reproduce this failure? Reading from JSON or Parquet?

On Mon, Oct 5, 2015 at 4:28 AM, Eugene Morozov
 wrote:
> Hi,
>
> We're building our own framework on top of spark and we give users pretty
> complex schema to work with. That requires from us to build dataframes by
> ourselves: we transform business objects to rows and struct types and uses
> these two to create dataframe.
>
> Everything was fine until I started to upgrade to spark 1.5.0 (from 1.3.1).
> Seems to be catalyst engine has been changed and now using almost the same
> code to produce rows and struct types I have the following:
> http://ibin.co/2HzUsoe9O96l, some of rows in the end result have different
> number of values and corresponding struct types.
>
> I'm almost sure it's my own fault, but there is always a small chance, that
> something is wrong in spark codebase. If you've seen something similar or if
> there is a jira for smth similar, I'd be glad to know. Thanks.
> --
> Be well!
> Jean Morozov

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to update python code in memory

2015-09-16 Thread Davies Liu
Short answer is No.

On Wed, Sep 16, 2015 at 4:06 AM, Margus Roo  wrote:
> Hi
>
> In example I submited python code to cluster:
> in/spark-submit --master spark://nn1:7077 SocketListen.py
> Now I discovered that I have to change something in SocketListen.py.
> One way is stop older work and submit new one.
> Is there way to change code in workers machines so that there no need to
> submit new code?
>
> --
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RE: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-11 Thread Davies Liu
I had ran similar benchmark for 1.5, do self join on a fact table with
join key that had many duplicated rows (there are N rows for the same
join key), say N, after join, there will be N*N rows for each join
key. Generating the joined row is slower in 1.5 than 1.4 (it needs to
copy left and right row together, but not in 1.4). If the generated
row is accessed after join, there will be not much difference between
1.5 and 1.4, because accessing the joined row is slower in 1.4 than
1.5.

So, for this particular query, 1.5 is slower than 1.4, will be even
slower if you increase the N. But for real workload, it will not, 1.5
is usually faster than 1.4.

On Fri, Sep 11, 2015 at 1:31 AM, prosp4300  wrote:
>
>
> By the way turn off the code generation could be an option to try, sometime 
> code generation could introduce slowness
>
>
> 在2015年09月11日 15:58,Cheng, Hao 写道:
>
> Can you confirm if the query really run in the cluster mode? Not the local 
> mode. Can you print the call stack of the executor when the query is running?
>
>
>
> BTW: spark.shuffle.reduceLocality.enabled is the configuration of Spark, not 
> Spark SQL.
>
>
>
> From: Todd [mailto:bit1...@163.com]
> Sent: Friday, September 11, 2015 3:39 PM
> To: Todd
> Cc: Cheng, Hao; Jesse F Chen; Michael Armbrust; user@spark.apache.org
> Subject: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
> compared with spark 1.4.1 SQL
>
>
>
> I add the following two options:
> spark.sql.planner.sortMergeJoin=false
> spark.shuffle.reduceLocality.enabled=false
>
> But it still performs the same as not setting them two.
>
> One thing is that on the spark ui, when I click the SQL tab, it shows an 
> empty page but the header title 'SQL',there is no table to show queries and 
> execution plan information.
>
>
>
>
>
> At 2015-09-11 14:39:06, "Todd"  wrote:
>
>
> Thanks Hao.
>  Yes,it is still low as SMJ。Let me try the option your suggested,
>
>
>
>
> At 2015-09-11 14:34:46, "Cheng, Hao"  wrote:
>
> You mean the performance is still slow as the SMJ in Spark 1.5?
>
>
>
> Can you set the spark.shuffle.reduceLocality.enabled=false when you start the 
> spark-shell/spark-sql? It’s a new feature in Spark 1.5, and it’s true by 
> default, but we found it probably causes the performance reduce dramatically.
>
>
>
>
>
> From: Todd [mailto:bit1...@163.com]
> Sent: Friday, September 11, 2015 2:17 PM
> To: Cheng, Hao
> Cc: Jesse F Chen; Michael Armbrust; user@spark.apache.org
> Subject: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with 
> spark 1.4.1 SQL
>
>
>
> Thanks Hao for the reply.
> I turn the merge sort join off, the physical plan is below, but the 
> performance is roughly the same as it on...
>
> == Physical Plan ==
> TungstenProject 
> [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
>  ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
>   TungstenExchange hashpartitioning(ss_item_sk#2)
>ConvertToUnsafe
> Scan 
> ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_list_price#12,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
>   TungstenExchange hashpartitioning(ss_item_sk#25)
>ConvertToUnsafe
> Scan 
> ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#25]
>
> Code Generation: true
>
>
>
>
> At 2015-09-11 13:48:23, "Cheng, Hao"  wrote:
>
> This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
> fully utilize the sorting yet, more details can be found at 
> https://issues.apache.org/jira/browse/SPARK-2926 .
>
>
>
> Anyway, can you disable the sort merge join by 
> “spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query 
> again? In our previous testing, it’s about 20% slower for sort merge join. I 
> am not sure if there anything else slow down the performance.
>
>
>
> Hao
>
>
>
>
>
> From: Jesse F Chen [mailto:jfc...@us.ibm.com]
> Sent: Friday, September 11, 2015 1:18 PM
> To: Michael Armbrust
> Cc: Todd; user@spark.apache.org
> Subject: Re: spark 1.5 SQL slows down dramatically by 50%+ compared with 
> spark 1.4.1 SQL
>
>
>
> Could this be a build issue (i.e., sbt package)?
>
> If I ran the same jar build for 1.4.1 in 1.5, I am seeing large regression 
> too in queries (all other things identical)...
>
> I am curious, to build 1.5 (when it isn't released yet), what do I need to do 
> with the build.sbt file?
>
> any special parameters i should be using to make sure I load the latest hive 
> dependencies?
>
> Michael Armbrust ---09/10/2015 11:07:28 AM---I've been running TPC-DS SF=1500 
> daily on Spark 1.4.1 and Spark 1.5 on S3, so this is surprising.  I
>
> From: Michael Armbrust 
> To: Todd 
> Cc: "user@spark.apache.org" 
> Date: 09/10/2015 11:07 AM

Re: Spark 1.5.0 java.lang.OutOfMemoryError: PermGen space

2015-09-11 Thread Davies Liu
Did this happen immediately after you start the cluster or after ran
some queries?

Is this in local mode or cluster mode?

On Fri, Sep 11, 2015 at 3:00 AM, Jagat Singh  wrote:
> Hi,
>
> We have queries which were running fine on 1.4.1 system.
>
> We are testing upgrade and even simple query like
>
> val t1= sqlContext.sql("select count(*) from table")
>
> t1.show
>
> This works perfectly fine on 1.4.1 but throws OOM error in 1.5.0
>
> Are there any changes in default memory settings from 1.4.1 to 1.5.0
>
> Thanks,
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-11 Thread Davies Liu
Thanks, I'm surprised to see there are so much difference (4x), there
could be something wrong in Spark (some contention between tasks).

On Fri, Sep 11, 2015 at 11:47 AM, Jesse F Chen <jfc...@us.ibm.com> wrote:
>
> @Davies...good question..
>
> > Just be curious how the difference would be if you use 20 executors
> > and 20G memory for each executor..
>
> So I tried the following combinations:
>
> (GB X # executors)  (query response time in secs)
> 20X20 415
> 10X40 230
> 5X80 141
> 4X100 128
> 2X200 104
>
> CPU utilization is high so spreading more JVMs onto more vCores helps in this 
> case.
> For other workloads where memory utilization outweighs CPU, i can see larger 
> JVM
> sizes maybe more beneficial. It's for sure case-by-case.
>
> Seems overhead for codegen and scheduler overhead are negligible.
>
>
>
> Davies Liu ---09/11/2015 10:41:23 AM---On Fri, Sep 11, 2015 at 10:31 AM, 
> Jesse F Chen <jfc...@us.ibm.com> wrote: >
>
> From: Davies Liu <dav...@databricks.com>
> To: Jesse F Chen/San Francisco/IBM@IBMUS
> Cc: "Cheng, Hao" <hao.ch...@intel.com>, Todd <bit1...@163.com>, Michael 
> Armbrust <mich...@databricks.com>, "user@spark.apache.org" 
> <user@spark.apache.org>
> Date: 09/11/2015 10:41 AM
> Subject: Re: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
> compared with spark 1.4.1 SQL
>
> 
>
>
>
> On Fri, Sep 11, 2015 at 10:31 AM, Jesse F Chen <jfc...@us.ibm.com> wrote:
> >
> > Thanks Hao!
> >
> > I tried your suggestion of setting 
> > spark.shuffle.reduceLocality.enabled=false and my initial tests showed 
> > queries are on par between 1.5 and 1.4.1.
> >
> > Results:
> >
> > tpcds-query39b-141.out:query time: 129.106478631 sec
> > tpcds-query39b-150-reduceLocality-false.out:query time: 128.854284296 sec
> > tpcds-query39b-150.out:query time: 572.443151734 sec
> >
> > With default  spark.shuffle.reduceLocality.enabled=true, I am seeing 
> > across-the-board slow down for majority of the TPCDS queries.
> >
> > My test is on a bare metal 20-node cluster. I ran the my test as follows:
> >
> > /TestAutomation/spark-1.5/bin/spark-submit  --master yarn-client  
> > --packages com.databricks:spark-csv_2.10:1.1.0 --name TPCDSSparkSQLHC
> > --conf spark.shuffle.reduceLocality.enabled=false
> > --executor-memory 4096m --num-executors 100
> > --class org.apache.spark.examples.sql.hive.TPCDSSparkSQLHC
> > /TestAutomation/databricks/spark-sql-perf-master/target/scala-2.10/tpcdssparksql_2.10-0.9.jar
> > hdfs://rhel2.cisco.com:8020/user/bigsql/hadoopds100g
> > /TestAutomation/databricks/spark-sql-perf-master/src/main/queries/jesse/query39b.sql
> >
>
> Just be curious how the difference would be if you use 20 executors
> and 20G memory for each executor. Share the same JVM for some tasks,
> could reduce the overhead for codegen and JIT, it may also reduce the
> overhead of `reduceLocality`(it can be easier to schedule the tasks).
>
> >
> >
> >
> > "Cheng, Hao" ---09/11/2015 01:00:28 AM---Can you confirm if the query 
> > really run in the cluster mode? Not the local mode. Can you print the c
> >
> > From: "Cheng, Hao" <hao.ch...@intel.com>
> > To: Todd <bit1...@163.com>
> > Cc: Jesse F Chen/San Francisco/IBM@IBMUS, Michael Armbrust 
> > <mich...@databricks.com>, "user@spark.apache.org" <user@spark.apache.org>
> > Date: 09/11/2015 01:00 AM
> > Subject: RE: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
> > compared with spark 1.4.1 SQL
> >
> > 
> >
> >
> >
> > Can you confirm if the query really run in the cluster mode? Not the local 
> > mode. Can you print the call stack of the executor when the query is 
> > running?
> >
> > BTW: spark.shuffle.reduceLocality.enabled is the configuration of Spark, 
> > not Spark SQL.
> >
> > From: Todd [mailto:bit1...@163.com]
> > Sent: Friday, September 11, 2015 3:39 PM
> > To: Todd
> > Cc: Cheng, Hao; Jesse F Chen; Michael Armbrust; user@spark.apache.org
> > Subject: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
> > compared with spark 1.4.1 SQL
> >
> > I add the following two options:
> > spark.sql.planner.sortMergeJoin=false
> > spark.shuffle.reduceLocality.enabled=false
> >
> > But it still performs the same as not setting them two.
> >
> > One thing is that on the spark ui, when I cl

Re: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-11 Thread Davies Liu
On Fri, Sep 11, 2015 at 10:31 AM, Jesse F Chen  wrote:
>
> Thanks Hao!
>
> I tried your suggestion of setting spark.shuffle.reduceLocality.enabled=false 
> and my initial tests showed queries are on par between 1.5 and 1.4.1.
>
> Results:
>
> tpcds-query39b-141.out:query time: 129.106478631 sec
> tpcds-query39b-150-reduceLocality-false.out:query time: 128.854284296 sec
> tpcds-query39b-150.out:query time: 572.443151734 sec
>
> With default  spark.shuffle.reduceLocality.enabled=true, I am seeing 
> across-the-board slow down for majority of the TPCDS queries.
>
> My test is on a bare metal 20-node cluster. I ran the my test as follows:
>
> /TestAutomation/spark-1.5/bin/spark-submit  --master yarn-client  --packages 
> com.databricks:spark-csv_2.10:1.1.0 --name TPCDSSparkSQLHC
> --conf spark.shuffle.reduceLocality.enabled=false
> --executor-memory 4096m --num-executors 100
> --class org.apache.spark.examples.sql.hive.TPCDSSparkSQLHC
> /TestAutomation/databricks/spark-sql-perf-master/target/scala-2.10/tpcdssparksql_2.10-0.9.jar
> hdfs://rhel2.cisco.com:8020/user/bigsql/hadoopds100g
> /TestAutomation/databricks/spark-sql-perf-master/src/main/queries/jesse/query39b.sql
>

Just be curious how the difference would be if you use 20 executors
and 20G memory for each executor. Share the same JVM for some tasks,
could reduce the overhead for codegen and JIT, it may also reduce the
overhead of `reduceLocality`(it can be easier to schedule the tasks).

>
>
>
> "Cheng, Hao" ---09/11/2015 01:00:28 AM---Can you confirm if the query really 
> run in the cluster mode? Not the local mode. Can you print the c
>
> From: "Cheng, Hao" 
> To: Todd 
> Cc: Jesse F Chen/San Francisco/IBM@IBMUS, Michael Armbrust 
> , "user@spark.apache.org" 
> Date: 09/11/2015 01:00 AM
> Subject: RE: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
> compared with spark 1.4.1 SQL
>
> 
>
>
>
> Can you confirm if the query really run in the cluster mode? Not the local 
> mode. Can you print the call stack of the executor when the query is running?
>
> BTW: spark.shuffle.reduceLocality.enabled is the configuration of Spark, not 
> Spark SQL.
>
> From: Todd [mailto:bit1...@163.com]
> Sent: Friday, September 11, 2015 3:39 PM
> To: Todd
> Cc: Cheng, Hao; Jesse F Chen; Michael Armbrust; user@spark.apache.org
> Subject: Re:Re:RE: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ 
> compared with spark 1.4.1 SQL
>
> I add the following two options:
> spark.sql.planner.sortMergeJoin=false
> spark.shuffle.reduceLocality.enabled=false
>
> But it still performs the same as not setting them two.
>
> One thing is that on the spark ui, when I click the SQL tab, it shows an 
> empty page but the header title 'SQL',there is no table to show queries and 
> execution plan information.
>
>
>
>
> At 2015-09-11 14:39:06, "Todd"  wrote:
>
>
> Thanks Hao.
> Yes,it is still low as SMJ。Let me try the option your suggested,
>
>
> At 2015-09-11 14:34:46, "Cheng, Hao"  wrote:
>
> You mean the performance is still slow as the SMJ in Spark 1.5?
>
> Can you set the spark.shuffle.reduceLocality.enabled=false when you start the 
> spark-shell/spark-sql? It’s a new feature in Spark 1.5, and it’s true by 
> default, but we found it probably causes the performance reduce dramatically.
>
>
> From: Todd [mailto:bit1...@163.com]
> Sent: Friday, September 11, 2015 2:17 PM
> To: Cheng, Hao
> Cc: Jesse F Chen; Michael Armbrust; user@spark.apache.org
> Subject: Re:RE: spark 1.5 SQL slows down dramatically by 50%+ compared with 
> spark 1.4.1 SQL
>
> Thanks Hao for the reply.
> I turn the merge sort join off, the physical plan is below, but the 
> performance is roughly the same as it on...
>
> == Physical Plan ==
> TungstenProject 
> [ss_quantity#10,ss_list_price#12,ss_coupon_amt#19,ss_cdemo_sk#4,ss_item_sk#2,ss_promo_sk#8,ss_sold_date_sk#0]
> ShuffledHashJoin [ss_item_sk#2], [ss_item_sk#25], BuildRight
>  TungstenExchange hashpartitioning(ss_item_sk#2)
>   ConvertToUnsafe
>Scan 
> ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_promo_sk#8,ss_quantity#10,ss_cdemo_sk#4,ss_list_price#12,ss_coupon_amt#19,ss_item_sk#2,ss_sold_date_sk#0]
>  TungstenExchange hashpartitioning(ss_item_sk#25)
>   ConvertToUnsafe
>Scan 
> ParquetRelation[hdfs://ns1/tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales][ss_item_sk#25]
>
> Code Generation: true
>
>
>
> At 2015-09-11 13:48:23, "Cheng, Hao"  wrote:
>
> This is not a big surprise the SMJ is slower than the HashJoin, as we do not 
> fully utilize the sorting yet, more details can be found at 
> https://issues.apache.org/jira/browse/SPARK-2926 .
>
> Anyway, can you disable the sort merge join by 
> “spark.sql.planner.sortMergeJoin=false;” in Spark 1.5, and run the query 
> again? In our previous 

Re: pyspark driver in cluster rather than gateway/client

2015-09-10 Thread Davies Liu
The YARN cluster mode for PySpark is supported since Spark 1.4:
https://issues.apache.org/jira/browse/SPARK-5162?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22python%20cluster%22

On Thu, Sep 10, 2015 at 6:54 AM, roy  wrote:
> Hi,
>
>   Is there any way to make spark driver to run in side YARN containers
> rather than gateway/client machine.
>
>   At present even with config parameters --master yarn & --deploy-mode
> cluster driver runs on gateway/client machine.
>
> We are on CDH 5.4.1 with YARN and Spark 1.3
>
> any help on this ?
>
> Thanks
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-driver-in-cluster-rather-than-gateway-client-tp24641.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NPE while reading ORC file using Spark 1.4 API

2015-09-08 Thread Davies Liu
I think this is fixed in 1.5 (release soon), by
https://github.com/apache/spark/pull/8407

On Tue, Sep 8, 2015 at 11:39 AM, unk1102  wrote:
> Hi I read many ORC files in Spark and process it those files are basically
> Hive partitions. Most of the times processing goes well but for few files I
> get the following exception dont know why? These files are working fine in
> Hive using Hive queries. Please guide. Thanks in advance.
>
> DataFrame df = hiveContext.read().format("orc").load("/path/in/hdfs");
>
> java.lang.NullPointerException
> at
> org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:402)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:206)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:238)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:290)
> at
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:288)
> at
> org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/NPE-while-reading-ORC-file-using-Spark-1-4-API-tp24609.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python Spark Streaming example with textFileStream does not work. Why?

2015-09-04 Thread Davies Liu
Spark Streaming only process the NEW files after it started, so you
should point it to a directory, and copy the file into it after
started.

On Fri, Sep 4, 2015 at 5:15 AM, Kamilbek  wrote:
> I use spark 1.3.1 and Python 2.7
>
> It is my first experience with Spark Streaming.
>
> I try example of code, which reads data from file using spark streaming.
>
> This is link to example:
> https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
>
> My code is the following:
>
> conf = (SparkConf()
>  .setMaster("local")
>  .setAppName("My app")
>  .set("spark.executor.memory", "1g"))
> sc = SparkContext(conf = conf)
> ssc = StreamingContext(sc, 1)
> lines = ssc.textFileStream('../inputs/2.txt')
> counts = lines.flatMap(lambda line: line.split(" "))\
>   .map(lambda x: (x, 1))\
>   .reduceByKey(lambda a, b: a+b)
> counts.pprint()
> ssc.start()
> ssc.awaitTermination()
>
>
> content of 2.txt file is following:
>
> a1 b1 c1 d1 e1 f1 g1
> a2 b2 c2 d2 e2 f2 g2
> a3 b3 c3 d3 e3 f3 g3
>
>
> I expect that something related to file content will be in console, but
> there are nothing. Nothing except text like this each second:
>
> ---
> Time: 2015-09-03 15:08:18
> ---
>
> and Spark's logs.
>
> Do I do some thing wrong? Otherwise why it does not work?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-Streaming-example-with-textFileStream-does-not-work-Why-tp24579.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: large number of import-related function calls in PySpark profile

2015-09-03 Thread Davies Liu
I think this is not a problem of PySpark, you also saw this if you
profile this script:

```
list(map(map_, range(sc.defaultParallelism)))
```

81777/808740.0860.0000.3600.000 :2264(_handle_fromlist)



On Thu, Sep 3, 2015 at 11:16 AM, Priedhorsky, Reid <rei...@lanl.gov> wrote:
>
> On Sep 2, 2015, at 11:31 PM, Davies Liu <dav...@databricks.com> wrote:
>
> Could you have a short script to reproduce this?
>
>
> Good point. Here you go. This is Python 3.4.3 on Ubuntu 15.04.
>
> import pandas as pd  # must be in default path for interpreter
> import pyspark
>
> LEN = 260
> ITER_CT = 1
>
> conf = pyspark.SparkConf()
> conf.set('spark.python.profile', 'true')
> sc = pyspark.SparkContext(conf=conf)
>
> a = sc.broadcast(pd.Series(range(LEN)))
>
> def map_(i):
>b = pd.Series(range(LEN))
>for i in range(ITER_CT):
>   b.corr(a.value)
>return None
>
> shards = sc.parallelize(range(sc.defaultParallelism), sc.defaultParallelism)
> data = shards.map(map_)
> data.collect()
>
> sc.show_profiles()
>
>
> Run as:
>
> $ spark-submit --master local[4] demo.py
>
>
> Here’s a profile excerpt:
>
> 
> Profile of RDD

Re: spark-submit not using conf/spark-defaults.conf

2015-09-03 Thread Davies Liu
I think it's a missing feature.

On Wed, Sep 2, 2015 at 10:58 PM, Axel Dahl <a...@whisperstream.com> wrote:
> So a bit more investigation, shows that:
>
> if I have configured spark-defaults.conf with:
>
> "spark.files  library.py"
>
> then if I call
>
> "spark-submit.py -v test.py"
>
> I see that my "spark.files" default option has been replaced with
> "spark.files  test.py",  basically spark-submit is overwriting
> spark.files with the name of the script.
>
> Is this a bug or is there another way to add default libraries without
> having to specify them on the command line?
>
> Thanks,
>
> -Axel
>
>
>
> On Wed, Sep 2, 2015 at 10:34 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> This should be a bug, could you create a JIRA for it?
>>
>> On Wed, Sep 2, 2015 at 4:38 PM, Axel Dahl <a...@whisperstream.com> wrote:
>> > in my spark-defaults.conf I have:
>> > spark.files   file1.zip, file2.py
>> > spark.master   spark://master.domain.com:7077
>> >
>> > If I execute:
>> > bin/pyspark
>> >
>> > I can see it adding the files correctly.
>> >
>> > However if I execute
>> >
>> > bin/spark-submit test.py
>> >
>> > where test.py relies on the file1.zip, I get and error.
>> >
>> > If I i instead execute
>> >
>> > bin/spark-submit --py-files file1.zip test.py
>> >
>> > It works as expected.
>> >
>> > How do I get spark-submit to import the spark-defaults.conf file or what
>> > should I start checking to figure out why one works and the other
>> > doesn't?
>> >
>> > Thanks,
>> >
>> > -Axel
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pySpark window functions are not working in the same way as Spark/Scala ones

2015-09-03 Thread Davies Liu
This is an known but in 1.4.1, fixed in 1.4.2 and 1.5 (both are not
released yet).

On Thu, Sep 3, 2015 at 7:41 AM, Sergey Shcherbakov
 wrote:
> Hello all,
>
> I'm experimenting with Spark 1.4.1 window functions
> and have come to a problem in pySpark that I've described in a Stackoverflow
> question
>
> In essence, the
>
> wSpec = Window.orderBy(df.a)
> df.select(df.a, func.rank().over(wSpec).alias("rank")).collect()
> df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
> func.lead(df.b,1).over(wSpec).alias("next")).collect()
>
> does not work in pySpark: exception for the first collect() and None output
> from window function in the second collect().
>
> While the same example in Spark/Scala works fine:
>
> val wSpec = Window.orderBy("a")
> df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
> df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"),
> lead(df("b"),1).over(wSpec).alias("next"))
>
> Am I doing anything wrong or this is a pySpark issue indeed?
>
>
> Best Regards,
> Sergey
>
> PS: Here is the full pySpark shell example:
>
> from pyspark.sql.window import Window
> import pyspark.sql.functions as func
>
> l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
> df = sqlContext.createDataFrame(l,["a","b"])
> wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
> df.select(df.a, func.rank().over(wSpec).alias("rank"))
> # ==> Failure org.apache.spark.sql.AnalysisException: Window function rank
> does not take a frame specification.
> df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
> func.lead(df.b,1).over(wSpec).alias("next"))
> # ===>  org.apache.spark.sql.AnalysisException: Window function lag does not
> take a frame specification.;
>
>
> wSpec = Window.orderBy(df.a)
> df.select(df.a, func.rank().over(wSpec).alias("rank"))
> # ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more
> arguments are expected.
>
> df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
> func.lead(df.b,1).over(wSpec).alias("next")).collect()
> # [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202,
> next=None), Row(a=3, prev=None, b=303, next=None)]
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: large number of import-related function calls in PySpark profile

2015-09-03 Thread Davies Liu
The slowness in PySpark may be related to searching path added by PySpark,
could you show the sys.path?

On Thu, Sep 3, 2015 at 1:38 PM, Priedhorsky, Reid <rei...@lanl.gov> wrote:
>
> On Sep 3, 2015, at 12:39 PM, Davies Liu <dav...@databricks.com> wrote:
>
> I think this is not a problem of PySpark, you also saw this if you
> profile this script:
>
> ```
> list(map(map_, range(sc.defaultParallelism)))
> ```
>
> 81777/808740.0860.0000.3600.000  importlib._bootstrap>:2264(_handle_fromlist)
>
>
> Thanks. Yes, I think you’re right; they seem to be coming from Pandas. Plain
> NumPy calculations do not generate the numerous import-related calls.
>
> That said, I’m still not sure why the time consumed in my real program is so
> much more (~20% rather than ~1%). I will see if I can figure out a better
> test program, or maybe try a different approach.
>
> Reid

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different Row objects?

2015-09-03 Thread Davies Liu
This was fixed by 1.5, could you download 1.5-RC3 to test this?

On Thu, Sep 3, 2015 at 4:45 PM, Wei Chen  wrote:
> Hey Friends,
>
> Recently I have been using Spark 1.3.1, mainly pyspark.sql. I noticed that
> the Row object collected directly from a DataFrame is different from the Row
> object we directly defined from Row(*arg, **kwarg).
>
from pyspark.sql.types import Row
aaa = Row(a=1, b=2, c=Row(a=1, b=2))
tuple(sc.parallelize([aaa]).toDF().collect()[0])
>
> (1, 2, (1, 2))
>
tuple(aaa)
>
> (1, 2, Row(a=1, b=2))
>
>
> This matters to me because I wanted to be able to create a DataFrame with
> one of the columns being a Row object by sqlcontext.createDataFrame(data,
> schema) where I specifically pass in the schema. However, if the data is RDD
> of Row objects like "aaa" in my example, it'll fail in __verify_type
> function.
>
>
>
> Thank you,
>
> Wei

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-submit not using conf/spark-defaults.conf

2015-09-02 Thread Davies Liu
This should be a bug, could you create a JIRA for it?

On Wed, Sep 2, 2015 at 4:38 PM, Axel Dahl  wrote:
> in my spark-defaults.conf I have:
> spark.files   file1.zip, file2.py
> spark.master   spark://master.domain.com:7077
>
> If I execute:
> bin/pyspark
>
> I can see it adding the files correctly.
>
> However if I execute
>
> bin/spark-submit test.py
>
> where test.py relies on the file1.zip, I get and error.
>
> If I i instead execute
>
> bin/spark-submit --py-files file1.zip test.py
>
> It works as expected.
>
> How do I get spark-submit to import the spark-defaults.conf file or what
> should I start checking to figure out why one works and the other doesn't?
>
> Thanks,
>
> -Axel

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: large number of import-related function calls in PySpark profile

2015-09-02 Thread Davies Liu
Could you have a short script to reproduce this?

On Wed, Sep 2, 2015 at 2:10 PM, Priedhorsky, Reid  wrote:
> Hello,
>
> I have a PySpark computation that relies on Pandas and NumPy. Currently, my
> inner loop iterates 2,000 times. I’m seeing the following show up in my
> profiling:
>
> 74804/291020.2040.0002.1730.000  importlib._bootstrap>:2234(_find_and_load)
> 74804/291020.1450.0001.8670.000  importlib._bootstrap>:2207(_find_and_load_unlocked)
> 45704/291020.0210.0001.8200.000  importlib._bootstrap>:313(_call_with_frames_removed)
> 45702/291000.0480.0001.7930.000 {built-in method __import__}
>
>
> That is, there are over 10 apparently import-related calls for each
> iteration of my inner loop. Commenting out the content of my loop removes
> most of the calls, and the number of them seems to scale with the number of
> inner loop iterations, so I’m pretty sure these calls are indeed coming from
> there.
>
> Further examination of the profile shows that the callers of these functions
> are inside Pandas, e.g. tseries.period.__getitem__(), which reads as
> follows:
>
> def __getitem__(self, key):
> getitem = self._data.__getitem__
> if np.isscalar(key):
> val = getitem(key)
> return Period(ordinal=val, freq=self.freq)
> else:
> if com.is_bool_indexer(key):
> key = np.asarray(key)
>
> result = getitem(key)
> if result.ndim > 1:
> # MPL kludge
> # values = np.asarray(list(values), dtype=object)
> # return values.reshape(result.shape)
>
> return PeriodIndex(result, name=self.name, freq=self.freq)
>
> return PeriodIndex(result, name=self.name, freq=self.freq)
>
>
> Note that there are not import statements here or calls to the functions
> above. My guess is that somehow PySpark’s pickle stuff is inserting them,
> e.g., around the self._data access.
>
> This is single-node testing currently. At this scale, about 1/3 of the time
> is spent in these import functions.
>
> Pandas and other modules are available on all workers either via the
> virtualenv or PYTHONPATH. I am not using --py-files.
>
> Since the inner loop is performance-critical, I can’t have imports happening
> there. My question is, why are these import functions being called and how
> can I avoid them?
>
> Thanks for any help.
>
> Reid
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom Partitioner

2015-09-01 Thread Davies Liu
You can take the sortByKey as example:
https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642

On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker  wrote:
> something like...
>
> class RangePartitioner(Partitioner):
> def __init__(self, numParts):
> self.numPartitions = numParts
> self.partitionFunction = rangePartition
> def rangePartition(key):
> # Logic to turn key into a partition id
> return id
>
> On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf  wrote:
>>
>> Hi
>>
>> I think range partitioner is not available in pyspark, so if we want
>> create one. how should we create that. my question is that.
>>
>> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker  wrote:
>>>
>>> Ah sorry I miss read your question. In pyspark it looks like you just
>>> need to instantiate the Partitioner class with numPartitions and
>>> partitionFunc.
>>>
>>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf  wrote:

 Hi

 I did not get this, e.g if i need to create a custom partitioner like
 range partitioner.

 On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker  wrote:
>
> Hi,
>
> You just need to extend Partitioner and override the numPartitions and
> getPartition methods, see below
>
> class MyPartitioner extends partitioner {
>   def numPartitions: Int = // Return the number of partitions
>   def getPartition(key Any): Int = // Return the partition for a given
> key
> }
>
> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri 
> wrote:
>>
>> Hi Sparkians
>>
>> How can we create a customer partition in pyspark
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>



 --
 with Regards
 Shahid Ashraf
>>
>>
>>
>>
>> --
>> with Regards
>> Shahid Ashraf

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-08-31 Thread Davies Liu
I had sent out a PR [1] to fix 2), could you help to test that?

[1]  https://github.com/apache/spark/pull/8543

On Mon, Aug 31, 2015 at 12:34 PM, Anders Arpteg  wrote:
> Was trying out 1.5 rc2 and noticed some issues with the Tungsten shuffle
> manager. One problem was when using the com.databricks.spark.avro reader and
> the error(1) was received, see stack trace below. The problem does not occur
> with the "sort" shuffle manager.
>
> Another problem was in a large complex job with lots of transformations
> occurring simultaneously, i.e. 50+ or more maps each shuffling data.
> Received error(2) about inability to acquire memory which seems to also have
> to do with Tungsten. Possibly some setting available to increase that
> memory, because there's lots of heap memory available.
>
> Am running on Yarn 2.2 with about 400 executors. Hoping this will give some
> hints for improving the upcoming release, or for me to get some hints to fix
> the problems.
>
> Thanks,
> Anders
>
> Error(1)
>
> 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 3387,
> lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException
>
>at java.io.DataInputStream.readInt(DataInputStream.java:392)
>
>at
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121)
>
>at
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109)
>
>at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>
>at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>at
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>
>at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>
>at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366)
>
>at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>
>at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$Tung
>
> stenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>
>at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>
>at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>
>at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:47)
>
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
>at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
>at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
>at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>at java.lang.Thread.run(Thread.java:745)
>
>
> Error(2)
>
> 5/08/31 18:41:25 WARN TaskSetManager: Lost task 16.1 in stage 316.0 (TID
> 32686, lon4-hadoopslave-b925.lon4.spotify.net): java.io.IOException: Unable
> to acquire 67108864 bytes of memory
>
>at
> org.apache.spark.shuffle.unsafe.UnsafeShuffleExternalSorter.acquireNewPageIfNecessary(UnsafeShuffleExternalSorter.java:385)
>
>at
> org.apache.spark.shuffle.unsafe.UnsafeShuffleExternalSorter.insertRecord(UnsafeShuffleExternalSorter.java:435)
>
>at
> org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:246)
>
>at
> org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:174)
>
>at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>
>at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
>at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>at java.lang.Thread.run(Thread.java:745)

-
To unsubscribe, 

Re: Join with multiple conditions (In reference to SPARK-7197)

2015-08-25 Thread Davies Liu
It's good to support this, could you create a JIRA for it and target for 1.6?

On Tue, Aug 25, 2015 at 11:21 AM, Michal Monselise
michal.monsel...@gmail.com wrote:

 Hello All,

 PySpark currently has two ways of performing a join: specifying a join 
 condition or column names.

 I would like to perform a join using a list of columns that appear in both 
 the left and right DataFrames. I have created an example in this question on 
 Stack Overflow.

 Basically, I would like to do the following as specified in the documentation 
 in  /spark/python/pyspark/sql/dataframe.py row 560 and specify a list of 
 column names:

  df.join(df4, ['name', 'age']).select(df.name, df.age).collect()

 However, this produces an error.

 In JIRA issue SPARK-7197, it is mentioned that the syntax is actually 
 different from the one specified in the documentation for joining using a 
 condition.

 Documentation:
  cond = [df.name == df3.name, df.age == df3.age]  df.join(df3, cond, 
  'outer').select(df.name, df3.age).collect()

 JIRA Issue:

 a.join(b, (a.year==b.year)  (a.month==b.month), 'inner')


 In other words. the join function cannot take a list.
 I was wondering if you could also clarify what is the correct syntax for 
 providing a list of columns.


 Thanks,
 Michal



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Davies Liu
As Aram said, there two options in Spark 1.4,

1) Use the HiveContext, then you got datediff from Hive,
df.selectExpr(datediff(d2, d1))
2) Use Python UDF:
```
 from datetime import date
 df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9, 26))], 
 ['d1', 'd2'])
 from pyspark.sql.functions import udf
 from pyspark.sql.types import IntegerType
 diff = udf(lambda a, b: (a - b).days, IntegerType())
 df.select(diff(df.d1, df.d2)).show()
+-+
|PythonUDF#lambda(d1,d2)|
+-+
|  -39|
+-+
```

On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan
aram.mkrtchyan...@gmail.com wrote:
 Hi,

 hope this will help you

 import org.apache.spark.sql.functions._
 import sqlContext.implicits._
 import java.sql.Timestamp

 val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2)

 val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =
   Days.daysBetween(new DateTime(value2.getTime), new
 DateTime(value1.getTime)).getDays)
 df.withColumn(diff, dateDiff(df(day2), df(day1))).show()

 or you can write sql query using hiveql's datediff function.
  https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

 On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com wrote:

 More update on this question..I am using spark 1.4.1.

 I was just reading documentation of spark 1.5 (still in development) and I
 think there will be a new func *datediff* that will solve the issue. So
 please let me know if there is any work-around until spark 1.5 is out :).

 pyspark.sql.functions.datediff(end, start)[source]

 Returns the number of days from start to end.

  df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1',
  'd2'])
  df.select(datediff(df.d2, df.d1).alias('diff')).collect()
 [Row(diff=32)]

 New in version 1.5.


 On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Apologies, sent too early accidentally. Actual message is below
 

 A dataframe has 2 datecolumns (datetime type) and I would like to add
 another column that would have difference between these two dates. Dataframe
 snippet is below.

 new_df.show(5)
 +---+--+--+
 | PATID| SVCDATE|next_diag_date|
 +---+--+--+
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+



 Here is what I have tried so far:

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE)).show()
 Error: DateType does not support numeric operations

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()
 Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);


 However this simple python code works fine with pySpark:

 from datetime import date
 d0 = date(2008, 8, 18)
 d1 = date(2008, 9, 26)
 delta = d0 - d1
 print (d0 - d1).days

 # -39


 Any suggestions would be appreciated! Also is there a way to add a new
 column in dataframe without using column expression (e.g. like in pandas or
 R. df$new_col = 'new col value')?


 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()

 +---+--+--+ | PATID| SVCDATE|next_diag_date|
 +---+--+--+ |12345655545|2012-02-13| 
 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark order-only window function issue

2015-08-12 Thread Davies Liu
This should be a bug, go ahead to open a JIRA for it, thanks!

On Tue, Aug 11, 2015 at 6:41 AM, Maciej Szymkiewicz
mszymkiew...@gmail.com wrote:
 Hello everyone,

 I am trying to use PySpark API with window functions without specifying
 partition clause. I mean something equivalent to this

 SELECT v, row_number() OVER (ORDER BY v) AS rn FROM df

 in SQL. I am not sure if I am doing something wrong or it is a bug but
 results are far from what I expect. Lets assume we have data as follows:

 from pyspark.sql.window import Window
 from pyspark.sql import functions as f

 df = sqlContext.createDataFrame(
 zip([foo] * 5 + [bar] * 5, range(1, 6) + range(6, 11)),
 (k, v)
 ).withColumn(dummy, f.lit(1))

 df.registerTempTable(df)
 df.show()

 +---+--+-+
 |  k| v|dummy|
 +---+--+-+
 |foo| 1|1|
 |foo| 2|1|
 |foo| 3|1|
 |foo| 4|1|
 |foo| 5|1|
 |bar| 6|1|
 |bar| 7|1|
 |bar| 8|1|
 |bar| 9|1|
 |bar|10|1|
 +---+--+-+

 When I use following SQL query

 sql_ord = SELECT k, v, row_number() OVER (
 ORDER BY v
 ) AS rn FROM df

 sqlContext.sql(sql_ord).show()

 I get expected results:

 +---+--+--+
 |  k| v|rn|
 +---+--+--+
 |foo| 1| 1|
 |foo| 2| 2|
 |foo| 3| 3|
 |foo| 4| 4|
 |foo| 5| 5|
 |bar| 6| 6|
 |bar| 7| 7|
 |bar| 8| 8|
 |bar| 9| 9|
 |bar|10|10|
 +---+--+--+

 but when I try to define a similar thing using Python API

 w_ord = Window.orderBy(v)
 df.select(k, v, f.rowNumber().over(w_ord).alias(avg)).show()

 I get results like this:

 +---+--+---+
 |  k| v|avg|
 +---+--+---+
 |foo| 1|  1|
 |foo| 2|  1|
 |foo| 3|  1|
 |foo| 4|  1|
 |foo| 5|  1|
 |bar| 6|  1|
 |bar| 7|  1|
 |bar| 8|  1|
 |bar| 9|  1|
 |bar|10|  1|
 +---+--+---+

 When I specify both partition on order

 w_part_ord = Window.partitionBy(dummy).orderBy(v)
 df.select(k, v, f.rowNumber().over(w_part_ord).alias(avg)).show()

 everything works as I expect:

 +---+--+---+
 |  k| v|avg|
 +---+--+---+
 |foo| 1|  1|
 |foo| 2|  2|
 |foo| 3|  3|
 |foo| 4|  4|
 |foo| 5|  5|
 |bar| 6|  6|
 |bar| 7|  7|
 |bar| 8|  8|
 |bar| 9|  9|
 |bar|10| 10|
 +---+--+---+

 Another example of similar behavior with correct SQL result:

 sql_ord_rng = SELECT k, v, avg(v) OVER (
 ORDER BY v
 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
 ) AS avg FROM df
 sqlContext.sql(sql_ord_rng).show()

 +---+--+---+
 |  k| v|avg|
 +---+--+---+
 |foo| 1|1.5|
 |foo| 2|2.0|
 |foo| 3|3.0|
 |foo| 4|4.0|
 |foo| 5|5.0|
 |bar| 6|6.0|
 |bar| 7|7.0|
 |bar| 8|8.0|
 |bar| 9|9.0|
 |bar|10|9.5|
 +---+--+---+

 and not incorrect PySpark

 w_ord_rng = Window.orderBy(v).rowsBetween(-1, 1)
 df.select(k, v, f.avg(v).over(w_ord_rng).alias(avg)).show()

 +---+--++
 |  k| v| avg|
 +---+--++
 |foo| 1| 1.0|
 |foo| 2| 2.0|
 |foo| 3| 3.0|
 |foo| 4| 4.0|
 |foo| 5| 5.0|
 |bar| 6| 6.0|
 |bar| 7| 7.0|
 |bar| 8| 8.0|
 |bar| 9| 9.0|
 |bar|10|10.0|
 +---+--++

 Same as before adding dummy partitions solves the problem:

 w_part_ord_rng =
 Window.partitionBy(dummy).orderBy(v).rowsBetween(-1, 1)
 df.select(k, v, f.avg(v).over(w_part_ord_rng).alias(avg)).show()

 +---+--+---+
 |  k| v|avg|
 +---+--+---+
 |foo| 1|1.5|
 |foo| 2|2.0|
 |foo| 3|3.0|
 |foo| 4|4.0|
 |foo| 5|5.0|
 |bar| 6|6.0|
 |bar| 7|7.0|
 |bar| 8|8.0|
 |bar| 9|9.0|
 |bar|10|9.5|
 +---+--+---+

 I've checked window functions tests
 (https://github.com/apache/spark/blob/ac507a03c3371cd5404ca195ee0ba0306badfc23/python/pyspark/sql/tests.py#L1105)
 but these cover only partition + order case.

 Is there something wrong with my window definitions or should I open
 Jira issue?

 Environment:

 - Debian GNU/Linux
 -  Spark 1.4.1
 - Python 2.7.9
 -  OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-1~deb8u1)

 --
 Best,
 Maciej



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem with take vs. takeSample in PySpark

2015-08-10 Thread Davies Liu
I tested this in master (1.5 release), it worked as expected (changed
spark.driver.maxResultSize to 10m),

 len(sc.range(10).map(lambda i: '*' * (123) ).take(1))
1
 len(sc.range(10).map(lambda i: '*' * (124) ).take(1))
15/08/10 10:45:55 ERROR TaskSetManager: Total size of serialized
results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize
(10.0 MB)
 len(sc.range(10).map(lambda i: '*' * (123) ).take(2))
15/08/10 10:46:04 ERROR TaskSetManager: Total size of serialized
results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize
(10.0 MB)

Could you reproduce this in 1.2?

We didn't change take() much since 1.2 (unable build 1.2 branch right
now, because of dependency changed)

On Mon, Aug 10, 2015 at 9:49 AM, David Montague davwm...@gmail.com wrote:
 Hi all,

 I am getting some strange behavior with the RDD take function in PySpark
 while doing some interactive coding in an IPython notebook.  I am running
 PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.

 I am using sc.wholeTextFiles and pandas to load a collection of .csv files
 into an RDD of pandas dataframes. I create an RDD called train_rdd for which
 each row of the RDD contains a label and pandas dataframe pair:

 import pandas as pd
 from StringIO import StringIO

 rdd = sc.wholeTextFiles(data_path, 1000)
 train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1]

 In order to test out the next steps I want to take, I am trying to use take
 to select one of the dataframes and apply the desired modifications before
 writing out the Spark code to apply it to all of the dataframes in parallel.

 However, when I try to use take like this:

 label, df = train_rdd.take(1)[0]

 I get a spark.driver.maxResultSize error (stack trace included at the end of
 this message). Now, each of these dataframes is only about 100MB in size, so
 should easily fit on the driver and not go over the maxResultSize limit of
 1024MB.

 If I instead use takeSample, though, there is no problem:

 label, df = train_rdd.takeSample(False, 1, seed=50)[0]

 (Here, I have set the seed so that the RDD that is selected is the same one
 that the take function is trying to load (i.e., the first one), just to
 ensure that it is not because the specific dataframe take is getting is too
 large.)

 Does calling take result in a collect operation being performed before
 outputting the first item? That would explain to me why this error is
 occurring, but that seems like poor behavior for the take function. Clearly
 takeSample is behaving the way I want it to, but it would be nice if I could
 get this behavior with the take function, or at least without needing to
 choose an element randomly. I was able to get the behavior I wanted above by
 just changing the seed until I got the dataframe I wanted, but I don't think
 that is a good approach in general.

 Any insight is appreciated.

 Best,
 David Montague




 ---
 Py4JJavaError Traceback (most recent call last)
 ipython-input-38-7eca647cba46 in module()
   1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0]
  2 label, df = train_rdd.take(1)[0]

 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py in
 take(self, num)
1109
1110 p = range(partsScanned, min(partsScanned +
 numPartsToTry, totalParts))
 -  res = self.context.runJob(self, takeUpToNumLeft, p,
 True)
1112
1113 items += res

 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py
 in runJob(self, rdd, partitionFunc, partitions, allowLocal)
 816 # SparkContext#runJob.
 817 mappedRDD = rdd.mapPartitions(partitionFunc)
 -- 818 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
 mappedRDD._jrdd, javaPartitions, allowLocal)
 819 return list(mappedRDD._collect_iterator_through_file(it))
 820

 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer, self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.runJob.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Total
 size of serialized results of 177 

Re: collect() works, take() returns ImportError: No module named iter

2015-08-10 Thread Davies Liu
Is it possible that you have Python 2.7 on the driver, but Python 2.6
on the workers?.

PySpark requires that you have the same minor version of Python in
both driver and worker. In PySpark 1.4+, it will do this check before
run any tasks.

On Mon, Aug 10, 2015 at 2:53 PM, YaoPau jonrgr...@gmail.com wrote:
 I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via
 iPython Notebook.  I'm getting collect() to work just fine, but take()
 errors.  (I'm having issues with collect() on other datasets ... but take()
 seems to break every time I run it.)

 My code is below.  Any thoughts?

 sc
 pyspark.context.SparkContext at 0x7ffbfa310f10
 sys.version
 '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC
 4.4.7 20120313 (Red Hat 4.4.7-1)]'
 hourly = sc.textFile('tester')
 hourly.collect()
 [u'a man',
  u'a plan',
  u'a canal',
  u'panama']
 hourly = sc.textFile('tester')
 hourly.take(2)
 ---
 Py4JJavaError Traceback (most recent call last)
 ipython-input-15-1feecba5868b in module()
   1 hourly = sc.textFile('tester')
  2 hourly.take(2)

 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self, num)
1223
1224 p = range(partsScanned, min(partsScanned +
 numPartsToTry, totalParts))
 - 1225 res = self.context.runJob(self, takeUpToNumLeft, p,
 True)
1226
1227 items += res

 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in
 runJob(self, rdd, partitionFunc, partitions, allowLocal)
 841 # SparkContext#runJob.
 842 mappedRDD = rdd.mapPartitions(partitionFunc)
 -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
 mappedRDD._jrdd, javaPartitions, allowLocal)
 844 return list(mappedRDD._collect_iterator_through_file(it))
 845

 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer, self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.runJob.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 10.0 (TID 47, dhd490101.autotrader.com):
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py,
 line 101, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py,
 line 96, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py,
 line 236, in dump_stream
 vs = list(itertools.islice(iterator, batch))
   File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line
 1220, in takeUpToNumLeft
 while taken  left:
 ImportError: No module named iter

 at 
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 

Re: SparkR Supported Types - Please add bigint

2015-08-07 Thread Davies Liu
They are actually the same thing, LongType. `long` is friendly for
developer, `bigint` is friendly for database guy, maybe data
scientists.

On Thu, Jul 23, 2015 at 11:33 PM, Sun, Rui rui@intel.com wrote:
 printSchema calls StructField. buildFormattedString() to output schema 
 information. buildFormattedString() use DataType.typeName as string 
 representation of  the data type.

 LongType. typeName = long
 LongType.simpleString = bigint

 I am not sure about the difference of these two type name representations.

 -Original Message-
 From: Exie [mailto:tfind...@prodevelop.com.au]
 Sent: Friday, July 24, 2015 1:35 PM
 To: user@spark.apache.org
 Subject: Re: SparkR Supported Types - Please add bigint

 Interestingly, after more digging, df.printSchema() in raw spark shows the 
 columns as a long, not a bigint.

 root
  |-- localEventDtTm: timestamp (nullable = true)
  |-- asset: string (nullable = true)
  |-- assetCategory: string (nullable = true)
  |-- assetType: string (nullable = true)
  |-- event: string (nullable = true)
  |-- extras: array (nullable = true)
  ||-- element: struct (containsNull = true)
  |||-- name: string (nullable = true)
  |||-- value: string (nullable = true)
  |-- ipAddress: string (nullable = true)
  |-- memberId: string (nullable = true)
  |-- system: string (nullable = true)
  |-- timestamp: long (nullable = true)
  |-- title: string (nullable = true)
  |-- trackingId: string (nullable = true)
  |-- version: long (nullable = true)

 I'm going to have to keep digging I guess. :(




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Supported-Types-Please-add-bigint-tp23975p23978.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: large scheduler delay in pyspark

2015-08-04 Thread Davies Liu
On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote:
 Hi,

 Recently, I met some problems about scheduler delay in pyspark. I worked
 several days on this problem, but not success. Therefore, I come to here to
 ask for help.

 I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge
 value by adding two list

 if I do reduceByKey as follows:
rdd.reduceByKey(lambda a, b: a+b)
 It works fine, scheduler delay is less than 10s. However if I do
 reduceByKey:
def f(a, b):
for i in b:
 if i not in a:
a.append(i)
return a
   rdd.reduceByKey(f)

Is it possible that you have large object that is also named `i` or `a` or `b`?

Btw, the second one could be slow than first one, because you try to lookup
a object in a list, that is O(N), especially when the object is large (dict).

 It will cause very large scheduler delay, about 15-20 mins.(The data I deal
 with is about 300 mb, and I use 5 machine with 32GB memory)

If you see scheduler delay, it means there may be a large broadcast involved.

 I know the second code is not the same as the first. In fact, my purpose is
 to implement the second, but not work. So I try the first one.
 I don't know whether this is related to the data(with long string) or Spark
 on Yarn. But the first code works fine on the same data.

 Is there any way to find out the log when spark stall in scheduler delay,
 please? Or any ideas about this problem?

 Thanks a lot in advance for your help.

 Cheers
 Gen



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark Nested Json Parsing

2015-07-20 Thread Davies Liu
Could you try SQLContext.read.json()?

On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote:
 Before using the json file as text file, can you make sure that each
 json string can fit in one line? Because textFile() will split the
 file by '\n'

 On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
 Hi,

 I am new to Apache Spark. I am trying to parse nested json using pyspark.
 Here is the code by which I am trying to parse Json.
 I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

 lines = sc.textFile(inputFile)

 import json
 def func(x):
 json_str = json.loads(x)
 if json_str['label']:
 if json_str['label']['label2']:
 return (1,1)
 return (0,1)

 lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

 I am getting following error,
 ERROR [Executor task launch worker-13] executor.Executor
 (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 247, in func
 return f(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 1561, in combineLocally
 merger.mergeValues(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
 line 252, in mergeValues
 for k, v in iterator:
   File stdin, line 2, in func
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
 obj, end = self._scanner.iterscan(s, **kw).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
 return scanstring(match.string, match.end(), encoding, strict)
 ValueError: Invalid \escape: line 1 column 855 (char 855)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
 stage 14.0 (TID 24)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073

Re: PySpark Nested Json Parsing

2015-07-20 Thread Davies Liu
Before using the json file as text file, can you make sure that each
json string can fit in one line? Because textFile() will split the
file by '\n'

On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
 Hi,

 I am new to Apache Spark. I am trying to parse nested json using pyspark.
 Here is the code by which I am trying to parse Json.
 I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

 lines = sc.textFile(inputFile)

 import json
 def func(x):
 json_str = json.loads(x)
 if json_str['label']:
 if json_str['label']['label2']:
 return (1,1)
 return (0,1)

 lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

 I am getting following error,
 ERROR [Executor task launch worker-13] executor.Executor
 (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 247, in func
 return f(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 1561, in combineLocally
 merger.mergeValues(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
 line 252, in mergeValues
 for k, v in iterator:
   File stdin, line 2, in func
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
 obj, end = self._scanner.iterscan(s, **kw).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
 return scanstring(match.string, match.end(), encoding, strict)
 ValueError: Invalid \escape: line 1 column 855 (char 855)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
 stage 14.0 (TID 24)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 

Re: Spark and SQL Server

2015-07-20 Thread Davies Liu
Sorry for the confusing. What's the other issues?

On Mon, Jul 20, 2015 at 8:26 AM, Young, Matthew T
matthew.t.yo...@intel.com wrote:
 Thanks Davies, that resolves the issue with Python.

 I was using the Java/Scala DataFrame documentation 
 https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html
  and assuming that it was the same for PySpark 
 http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.
  I will keep this distinction in mind going forward.

 I guess we have to wait for Microsoft to release an SQL Server connector for 
 Spark to resolve the other issues.

 Cheers,

 -- Matthew Young

 
 From: Davies Liu [dav...@databricks.com]
 Sent: Saturday, July 18, 2015 12:45 AM
 To: Young, Matthew T
 Cc: user@spark.apache.org
 Subject: Re: Spark and SQL Server

 I think you have a mistake on call jdbc(), it should be:

 jdbc(self, url, table, mode, properties)

 You had use properties as the third parameter.

 On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
 matthew.t.yo...@intel.com wrote:
 Hello,

 I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
 4.2 JDBC Driver. Reading from the database works ok, but I have encountered 
 a couple of issues writing back. In Scala 2.10 I can write back to the 
 database except for a couple of types.


 1.  When I read a DataFrame from a table that contains a datetime column 
 it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
 for Spark purposes, but when I go to write this back to the database with 
 df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
 type to SQL Server, which is not a date/time storing type in TSQL. I think 
 it should be writing a datetime, but I’m not sure how to tell Spark this.



 2.  A related misunderstanding happens when I try to write a 
 java.lang.boolean to the database; it errors out because Spark is trying to 
 specify the width of the bit type, which is illegal in SQL Server (error 
 msg: Cannot specify a column width on data type bit). Do I need to edit 
 Spark source to fix this behavior, or is there a configuration option 
 somewhere that I am not aware of?


 When I attempt to write back to SQL Server in an IPython notebook, py4j 
 seems unable to convert a Python dict into a Java hashmap, which is 
 necessary for parameter passing. I’ve documented details of this problem 
 with code examples 
 herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session.
  Any advice would be appreciated.

 Thank you for your time,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and SQL Server

2015-07-18 Thread Davies Liu
I think you have a mistake on call jdbc(), it should be:

jdbc(self, url, table, mode, properties)

You had use properties as the third parameter.

On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
matthew.t.yo...@intel.com wrote:
 Hello,

 I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a 
 couple of issues writing back. In Scala 2.10 I can write back to the database 
 except for a couple of types.


 1.  When I read a DataFrame from a table that contains a datetime column 
 it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
 for Spark purposes, but when I go to write this back to the database with 
 df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
 type to SQL Server, which is not a date/time storing type in TSQL. I think it 
 should be writing a datetime, but I’m not sure how to tell Spark this.



 2.  A related misunderstanding happens when I try to write a 
 java.lang.boolean to the database; it errors out because Spark is trying to 
 specify the width of the bit type, which is illegal in SQL Server (error msg: 
 Cannot specify a column width on data type bit). Do I need to edit Spark 
 source to fix this behavior, or is there a configuration option somewhere 
 that I am not aware of?


 When I attempt to write back to SQL Server in an IPython notebook, py4j seems 
 unable to convert a Python dict into a Java hashmap, which is necessary for 
 parameter passing. I’ve documented details of this problem with code examples 
 herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session.
  Any advice would be appreciated.

 Thank you for your time,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running foreach on a list of rdds in parallel

2015-07-16 Thread Davies Liu
sc.union(rdds).saveAsTextFile()

On Wed, Jul 15, 2015 at 10:37 PM, Brandon White bwwintheho...@gmail.com wrote:
 Hello,

 I have a list of rdds

 List(rdd1, rdd2, rdd3,rdd4)

 I would like to save these rdds in parallel. Right now, it is running each
 operation sequentially. I tried using a rdd of rdd but that does not work.

 list.foreach { rdd =
   rdd.saveAsTextFile(/tmp/cache/)
 }

 Any ideas?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark 1.4 udf change date values

2015-07-16 Thread Davies Liu
Thanks for reporting this, could you file a JIRA for it?

On Thu, Jul 16, 2015 at 8:22 AM, Luis Guerra luispelay...@gmail.com wrote:
 Hi all,

 I am having some troubles when using a custom udf in dataframes with pyspark
 1.4.

 I have rewritten the udf to simplify the problem and it gets even weirder.
 The udfs I am using do absolutely nothing, they just receive some value and
 output the same value with the same format.

 I show you my code below:

 c= a.join(b, a['ID'] == b['ID_new'], 'inner')

 c.filter(c['ID'] == 'XX').show()

 udf_A = UserDefinedFunction(lambda x: x, DateType())
 udf_B = UserDefinedFunction(lambda x: x, DateType())
 udf_C = UserDefinedFunction(lambda x: x, DateType())

 d = c.select(c['ID'], c['t1'].alias('ta'),
 udf_A(vinc_muestra['t2']).alias('tb'),
 udf_B(vinc_muestra['t1']).alias('tc'),
 udf_C(vinc_muestra['t2']).alias('td'))

 d.filter(d['ID'] == 'XX').show()

 I am showing here the results from the outputs:

 +++--+--+
 |  ID | ID_new  | t1 |   t2  |
 +++--+--+
 |62698917|   62698917|   2012-02-28|   2014-02-28|
 |62698917|   62698917|   2012-02-20|   2013-02-20|
 |62698917|   62698917|   2012-02-28|   2014-02-28|
 |62698917|   62698917|   2012-02-20|   2013-02-20|
 |62698917|   62698917|   2012-02-20|   2013-02-20|
 |62698917|   62698917|   2012-02-28|   2014-02-28|
 |62698917|   62698917|   2012-02-28|   2014-02-28|
 |62698917|   62698917|   2012-02-20|   2013-02-20|
 +++--+--+

 ++---+---+++
 |   ID| ta  |tb  | tc|  td   |
 ++---+---+++
 |62698917| 2012-02-28|   20070305|20030305|20140228|
 |62698917| 2012-02-20|   20070215|20020215|20130220|
 |62698917| 2012-02-28|   20070310|20050310|20140228|
 |62698917| 2012-02-20|   20070305|20030305|20130220|
 |62698917| 2012-02-20|   20130802|20130102|20130220|
 |62698917| 2012-02-28|   20070215|20020215|20140228|
 |62698917| 2012-02-28|   20070215|20020215|20140228|
 |62698917| 2012-02-20|   20140102|20130102|20130220|
 ++---+---+++

 My problem here is that values at columns 'tb', 'tc' and 'td' in dataframe
 'd' are completely different from values 't1' and 't2' in dataframe c even
 when my udfs are doing nothing. It seems like if values were somehow got
 from other registers (or just invented). Results are different between
 executions (apparently random).

 Any insight on this?

 Thanks in advance


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Language support for Spark libraries

2015-07-13 Thread Davies Liu
On Mon, Jul 13, 2015 at 11:06 AM, Lincoln Atkinson lat...@microsoft.com wrote:
 I’m still getting acquainted with the Spark ecosystem, and wanted to make
 sure my understanding of the different API layers is correct.



 Is this an accurate picture of the major API layers, and their associated
 client support?



 Thanks,

 -Lincoln



 Spark Core:

This should be SQL, right now the R API for Spark Core is not public.

 -  Scala

 -  Java

 -  Python

 -  R



 MLLib

 -  Scala

 -  Java

 -  Python



 GraphX

 -  Scala

 -  Java



 SparkSQL

 -  Scala

 -  Java

 -  Python

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark without PySpark

2015-07-08 Thread Davies Liu
Great post, thanks for sharing with us!

On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com wrote:
 Hi Julian,

 I recently built a Python+Spark application to do search relevance
 analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on
 EC2 (so I don't use the PySpark shell, hopefully thats what you are looking
 for). Can't share the code, but the basic approach is covered in this blog
 post - scroll down to the section Writing a Spark Application.

 https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

 Hope this helps,

 -sujit


 On Wed, Jul 8, 2015 at 7:46 AM, Julian julian+sp...@magnetic.com wrote:

 Hey.

 Is there a resource that has written up what the necessary steps are for
 running PySpark without using the PySpark shell?

 I can reverse engineer (by following the tracebacks and reading the shell
 source) what the relevant Java imports needed are, but I would assume
 someone has attempted this before and just published something I can
 either
 follow or install? If not, I have something that pretty much works and can
 publish it, but I'm not a heavy Spark user, so there may be some things
 I've
 left out that I haven't hit because of how little of pyspark I'm playing
 with.

 Thanks,
 Julian



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-without-PySpark-tp23719.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: User Defined Functions - Execution on Clusters

2015-07-06 Thread Davies Liu
Currently, Python UDFs run in a Python instances, are MUCH slower than
Scala ones (from 10 to 100x). There is JIRA to improve the
performance: https://issues.apache.org/jira/browse/SPARK-8632, After
that, they will be still much slower than Scala ones (because Python
is lower and the overhead for calling Python).

On Mon, Jul 6, 2015 at 12:55 PM, Eskilson,Aleksander
alek.eskil...@cerner.com wrote:
 Hi there,

 I’m trying to get a feel for how User Defined Functions from SparkSQL (as
 written in Python and registered using the udf function from
 pyspark.sql.functions) are run behind the scenes. Trying to grok the source
 it seems that the native Python function is serialized for distribution to
 the clusters. In practice, it seems to be able to check for other variables
 and functions defined elsewhere in the namepsace and include those in the
 function’s serialization.

 Following all this though, when actually run, are Python interpreter
 instances on each node brought up to actually run the function against the
 RDDs, or can the serialized function somehow be run on just the JVM? If
 bringing up Python instances is the execution model, what is the overhead of
 PySpark UDFs like as compared to those registered in Scala?

 Thanks,
 Alek
 CONFIDENTIALITY NOTICE This message and any included attachments are from
 Cerner Corporation and are intended only for the addressee. The information
 contained in this message is confidential and may constitute inside or
 non-public information under international, federal, or state securities
 laws. Unauthorized forwarding, printing, copying, distribution, or use of
 such information is strictly prohibited and may be unlawful. If you are not
 the addressee, please promptly delete this message and notify the sender of
 the delivery error by e-mail or you may call Cerner's corporate offices in
 Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: is there any significant performance issue converting between rdd and dataframes in pyspark?

2015-07-02 Thread Davies Liu
On Mon, Jun 29, 2015 at 1:27 PM, Axel Dahl a...@whisperstream.com wrote:
 In pyspark, when I convert from rdds to dataframes it looks like the rdd is
 being materialized/collected/repartitioned before it's converted to a
 dataframe.

It's not true. When converting a RDD to dataframe, it only take a few of rows to
infer the types, no other collect/repartition will happen.

 Just wondering if there's any guidelines for doing this conversion and
 whether it's best to do it early to get the performance benefits of
 dataframes or weigh that against the size/number of items in the rdd.

It's better to do it as early as possible, I think.

 Thanks,

 -Axel


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL vs. DataFrame API

2015-06-23 Thread Davies Liu
I think it also happens in DataFrames API of all languages.

On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco elnopin...@gmail.com wrote:
 That issue happens only in python dsl?

 El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió:

 Thanks! The solution:

 https://gist.github.com/dokipen/018a1deeab668efdf455

 On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com wrote:

 Right now, we can not figure out which column you referenced in
 `select`, if there are multiple row with the same name in the joined
 DataFrame (for example, two `value`).

 A workaround could be:

 numbers2 = numbers.select(df.name, df.value.alias('other'))
 rows = numbers.join(numbers2,
 (numbers.name==numbers2.name)  (numbers.value !=
 numbers2.other),
 how=inner) \
   .select(numbers.name, numbers.value, numbers2.other) \
   .collect()

 On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com
 wrote:
  Sorry thought it was scala/spark
 
  El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió:
 
  That's invalid syntax. I'm pretty sure pyspark is using a DSL to
  create a
  query here and not actually doing an equality operation.
 
  On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com
  wrote:
 
  Probably you should use === instead of == and !== instead of !=
 
  Can anyone explain why the dataframe API doesn't work as I expect it
  to
  here? It seems like the column identifiers are getting confused.
 
  https://gist.github.com/dokipen/4b324a7365ae87b7b0e5

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL vs. DataFrame API

2015-06-23 Thread Davies Liu
If yo change to ```val numbers2 = numbers```,  then it have the same problem

On Tue, Jun 23, 2015 at 2:54 PM, Ignacio Blasco elnopin...@gmail.com wrote:
 It seems that it doesn't happen in Scala API. Not exactly the same as in
 python, but pretty close.

 https://gist.github.com/elnopintan/675968d2e4be68958df8

 2015-06-23 23:11 GMT+02:00 Davies Liu dav...@databricks.com:

 I think it also happens in DataFrames API of all languages.

 On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco elnopin...@gmail.com
 wrote:
  That issue happens only in python dsl?
 
  El 23/6/2015 5:05 p. m., Bob Corsaro rcors...@gmail.com escribió:
 
  Thanks! The solution:
 
  https://gist.github.com/dokipen/018a1deeab668efdf455
 
  On Mon, Jun 22, 2015 at 4:33 PM Davies Liu dav...@databricks.com
  wrote:
 
  Right now, we can not figure out which column you referenced in
  `select`, if there are multiple row with the same name in the joined
  DataFrame (for example, two `value`).
 
  A workaround could be:
 
  numbers2 = numbers.select(df.name, df.value.alias('other'))
  rows = numbers.join(numbers2,
  (numbers.name==numbers2.name)  (numbers.value !=
  numbers2.other),
  how=inner) \
.select(numbers.name, numbers.value, numbers2.other) \
.collect()
 
  On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco
  elnopin...@gmail.com
  wrote:
   Sorry thought it was scala/spark
  
   El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com
   escribió:
  
   That's invalid syntax. I'm pretty sure pyspark is using a DSL to
   create a
   query here and not actually doing an equality operation.
  
   On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco
   elnopin...@gmail.com
   wrote:
  
   Probably you should use === instead of == and !== instead of !=
  
   Can anyone explain why the dataframe API doesn't work as I expect
   it
   to
   here? It seems like the column identifiers are getting confused.
  
   https://gist.github.com/dokipen/4b324a7365ae87b7b0e5



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL vs. DataFrame API

2015-06-22 Thread Davies Liu
Right now, we can not figure out which column you referenced in
`select`, if there are multiple row with the same name in the joined
DataFrame (for example, two `value`).

A workaround could be:

numbers2 = numbers.select(df.name, df.value.alias('other'))
rows = numbers.join(numbers2,
(numbers.name==numbers2.name)  (numbers.value !=
numbers2.other),
how=inner) \
  .select(numbers.name, numbers.value, numbers2.other) \
  .collect()

On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote:
 Sorry thought it was scala/spark

 El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió:

 That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a
 query here and not actually doing an equality operation.

 On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com
 wrote:

 Probably you should use === instead of == and !== instead of !=

 Can anyone explain why the dataframe API doesn't work as I expect it to
 here? It seems like the column identifiers are getting confused.

 https://gist.github.com/dokipen/4b324a7365ae87b7b0e5

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java Constructor Issues

2015-06-21 Thread Davies Liu
The compiled jar is not consistent with Python source, maybe you are
using a older version pyspark, but with assembly jar of Spark Core
1.4?

On Sun, Jun 21, 2015 at 7:24 AM, Shaanan Cohney shaan...@gmail.com wrote:

 Hi all,


 I'm having an issue running some code that works on a build of spark I made
 (and still have) but now rebuilding it again, I get the below traceback. I
 built it using the 1.4.0 release, profile hadoop-2.4 but version 2.7 and I'm
 using python3. It's not vital to my work (as I can use my other build) but
 I'd still like to figure out what's going on.

 Best,
 shaananc

 Traceback (most recent call last):
   File factor.py, line 73, in module
 main()
   File factor.py, line 53, in main
 poly_filename = polysel.run(sc, parameters)
   File /home/ubuntu/spark_apps/polysel.py, line 90, in run
 polysel1_bestpolys = run_polysel1(sc, parameters)
   File /home/ubuntu/spark_apps/polysel.py, line 72, in run_polysel1
 polysel1_bestpolys = [v for _, v in polysel1_polys.takeOrdered(nrkeep,
 key=lambda s: s[0])]
   File /home/ubuntu/spark/python/pyspark/rdd.py, line 1198, in takeOrdered
 return self.mapPartitions(lambda it: [heapq.nsmallest(num, it,
 key)]).reduce(merge)
   File /home/ubuntu/spark/python/pyspark/rdd.py, line 762, in reduce
 vals = self.mapPartitions(func).collect()
   File /home/ubuntu/spark/python/pyspark/rdd.py, line 736, in collect
 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   File /home/ubuntu/spark/python/pyspark/rdd.py, line 2343, in _jrdd
 bvars, self.ctx._javaAccumulator)
   File /usr/local/lib/python3.4/dist-packages/py4j/java_gateway.py, line
 701, in __call__
 self._fqn)
   File /usr/local/lib/python3.4/dist-packages/py4j/protocol.py, line 304,
 in get_return_value
 format(target_id, '.', name, value))
 py4j.protocol.Py4JError: An error occurred while calling
 None.org.apache.spark.api.python.PythonRDD. Trace:
 py4j.Py4JException: Constructor org.apache.spark.api.python.PythonRDD([class
 org.apache.spark.rdd.ParallelCollectionRDD, class [B, class
 java.util.HashMap, class java.util.ArrayList, class java.lang.Boolean, class
 java.lang.String, class java.util.ArrayList, class
 org.apache.spark.Accumulator]) does not exist
 at
 py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184)
 at
 py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202)
 at py4j.Gateway.invoke(Gateway.java:213)
 at
 py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
 at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR - issue when starting the sparkR shell

2015-06-19 Thread Davies Liu
Yes, right now, we only tested SparkR with R 3.x

On Fri, Jun 19, 2015 at 5:53 AM, Kulkarni, Vikram
vikram.kulka...@hp.com wrote:
 Hello,

   I am seeing this issue when starting the sparkR shell. Please note that I
 have R version 2.14.1.



 [root@vertica4 bin]# sparkR



 R version 2.14.1 (2011-12-22)

 Copyright (C) 2011 The R Foundation for Statistical Computing

 ISBN 3-900051-07-0

 Platform: x86_64-unknown-linux-gnu (64-bit)



 R is free software and comes with ABSOLUTELY NO WARRANTY.

 You are welcome to redistribute it under certain conditions.

 Type 'license()' or 'licence()' for distribution details.



   Natural language support but running in an English locale



 R is a collaborative project with many contributors.

 Type 'contributors()' for more information and

 'citation()' on how to cite R or R packages in publications.



 Type 'demo()' for some demos, 'help()' for on-line help, or

 'help.start()' for an HTML browser interface to help.

 Type 'q()' to quit R.



 [Previously saved workspace restored]



 Error in eval(expr, envir, enclos) :

   could not find function .getNamespace

 Error: unable to load R code in package ‘SparkR’

 During startup - Warning message:

 package ‘SparkR’ in options(defaultPackages) was not found



 Save workspace image? [y/n/c]: n

 [root@vertica4 bin]#



 Error: unable to load R code in package ‘SparkR’

 package ‘SparkR’ in options(defaultPackages) was not found



 Do I need to install 3.x version of R for SparkR?



 Regards,

 Vikram



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cassandra - Spark 1.3 - reading data from cassandra table with PYSpark

2015-06-19 Thread Davies Liu
On Fri, Jun 19, 2015 at 7:33 AM, Koen Vantomme koen.vanto...@gmail.com wrote:
 Hello,

 I'm trying to read data from a table stored in cassandra with pyspark.
 I found the scala code to loop through the table :
 cassandra_rdd.toArray.foreach(println)

 How can this be translated into PySpark  ?

 code snipplet :
 sc_cass = CassandraSparkContext(conf=conf)
 cassandra_rdd = sc_cass.cassandraTable(tutorial, user)
 #cassandra_rdd.toArray.foreach(println)

for row in cassandra_rdd.collect():
  print(row)
 cassandra_rdd.

 Regards,
 Koen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ERROR in withColumn method

2015-06-19 Thread Davies Liu
This is an known issue:
https://issues.apache.org/jira/browse/SPARK-8461?filter=-1

Will be fixed soon by https://github.com/apache/spark/pull/6898

On Fri, Jun 19, 2015 at 5:50 AM, Animesh Baranawal
animeshbarana...@gmail.com wrote:
 I am trying to perform some insert column operations in dataframe. Following
 is the code I used:

 val df = sqlContext.read.json(examples/src/main/resources/people.json)
 df.show() { works correctly }
 df.withColumn(age, df.col(name) ) { works correctly }
 df.withColumn(age, df.col(name) ).show() { gives ERROR }
 df.withColumn(arbitrary, df.col(name) ).show() { gives ERROR }

 This is the ERROR LOG :

 
 ERROR GenerateMutableProjection: failed to compile:

   import org.apache.spark.sql.catalyst.InternalRow;

   public SpecificProjection
 generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
 return new SpecificProjection(expr);
   }

   class SpecificProjection extends
 org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {

 private org.apache.spark.sql.catalyst.expressions.Expression[]
 expressions = null;
 private org.apache.spark.sql.catalyst.expressions.MutableRow
 mutableRow = null;

 public
 SpecificProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
 expr) {
   expressions = expr;
   mutableRow = new
 org.apache.spark.sql.catalyst.expressions.GenericMutableRow(3);
 }

 public
 org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection
 target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
   mutableRow = row;
   return this;
 }

 /* Provide immutable access to the last projected row. */
 public InternalRow currentValue() {
   return (InternalRow) mutableRow;
 }

 public Object apply(Object _i) {
   InternalRow i = (InternalRow) _i;

 boolean isNull0 = i.isNullAt(0);
 long primitive1 = isNull0 ?
 -1L : (i.getLong(0));

   if(isNull0)
 mutableRow.setNullAt(0);
   else
 mutableRow.setLong(0, primitive1);


 boolean isNull2 = i.isNullAt(1);
 org.apache.spark.unsafe.types.UTF8String primitive3 = isNull2 ?
 null : ((org.apache.spark.unsafe.types.UTF8String)i.apply(1));

   if(isNull2)
 mutableRow.setNullAt(1);
   else
 mutableRow.update(1, primitive3);


 boolean isNull4 = i.isNullAt(1);
 org.apache.spark.unsafe.types.UTF8String primitive5 = isNull4 ?
 null : ((org.apache.spark.unsafe.types.UTF8String)i.apply(1));

   if(isNull4)
 mutableRow.setNullAt(2);
   else
 mutableRow.update(2, primitive5);


   return mutableRow;
 }
   }

 org.codehaus.commons.compiler.CompileException: Line 28, Column 35: Object
 at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
 at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
 at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
 at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
 at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
 at
 org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
 at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
 at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
 at org.codehaus.janino.UnitCompiler.access$16700(UnitCompiler.java:185)
 at
 org.codehaus.janino.UnitCompiler$31.getParameterTypes2(UnitCompiler.java:8533)
 at org.codehaus.janino.IClass$IInvocable.getParameterTypes(IClass.java:835)
 at org.codehaus.janino.IClass$IMethod.getDescriptor2(IClass.java:1063)
 at org.codehaus.janino.IClass$IInvocable.getDescriptor(IClass.java:849)
 at org.codehaus.janino.IClass.getIMethods(IClass.java:211)
 at org.codehaus.janino.IClass.getIMethods(IClass.java:199)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:409)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
 at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
 at
 org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
 at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
 at
 org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
 at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
 at
 org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
 at
 

Re: Got the exception when joining RDD with spark streamRDD

2015-06-18 Thread Davies Liu
This seems be a bug, could you file a JIRA for it?

RDD should be serializable for Streaming job.

On Thu, Jun 18, 2015 at 4:25 AM, Groupme grou...@gmail.com wrote:
 Hi,


 I am writing pyspark stream program. I have the training data set to compute
 the regression model. I want to use the stream data set to test the model.
 So, I join with RDD with the StreamRDD, but i got the exception. Following
 are my source code, and the exception I got. Any help is appreciated. Thanks


 Regards,

 Afancy

 


 from __future__ import print_function

 import sys,os,datetime

 from pyspark import SparkContext
 from pyspark.streaming import StreamingContext
 from pyspark.sql.context import SQLContext
 from pyspark.resultiterable import ResultIterable
 from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
 import numpy as np
 import statsmodels.api as sm


 def splitLine(line, delimiter='|'):
 values = line.split(delimiter)
 st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
 return (values[0],st.hour), values[2:]

 def reg_m(y, x):
 ones = np.ones(len(x[0]))
 X = sm.add_constant(np.column_stack((x[0], ones)))
 for ele in x[1:]:
 X = sm.add_constant(np.column_stack((ele, X)))
 results = sm.OLS(y, X).fit()
 return results

 def train(line):
 y,x = [],[]
 y, x = [],[[],[],[],[],[],[]]
 reading_tmp,temp_tmp = [],[]
 i = 0
 for reading, temperature in line[1]:
 if i%4==0 and len(reading_tmp)==4:
 y.append(reading_tmp.pop())
 x[0].append(reading_tmp.pop())
 x[1].append(reading_tmp.pop())
 x[2].append(reading_tmp.pop())
 temp = float(temp_tmp[0])
 del temp_tmp[:]
 x[3].append(temp-20.0 if temp20.0 else 0.0)
 x[4].append(16.0-temp if temp16.0 else 0.0)
 x[5].append(5.0-temp if temp5.0 else 0.0)
 reading_tmp.append(float(reading))
 temp_tmp.append(float(temperature))
 i = i + 1
 return str(line[0]),reg_m(y, x).params.tolist()

 if __name__ == __main__:
 if len(sys.argv) != 4:
 print(Usage: regression.py checkpointDir trainingDataDir
 streamDataDir, file=sys.stderr)
 exit(-1)

 checkpoint, trainingInput, streamInput = sys.argv[1:]
 sc = SparkContext(local[2], appName=BenchmarkSparkStreaming)

 trainingLines = sc.textFile(trainingInput)
 modelRDD = trainingLines.map(lambda line: splitLine(line, |))\
 .groupByKey()\
 .map(lambda line: train(line))\
 .cache()


 ssc = StreamingContext(sc, 2)
 ssc.checkpoint(checkpoint)
 lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line,
 |))


 testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]),
 line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
 testRDD.pprint(20)

 ssc.start()
 ssc.awaitTermination()


 

 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set
 to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
 Traceback (most recent call last):
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py,
 line 90, in dumps
 return bytearray(self.serializer.dumps((func.func, func.deserializers)))
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py,
 line 427, in dumps
 return cloudpickle.dumps(obj, 2)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 622, in dumps
 cp.dump(obj)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 107, in dump
 return Pickler.dump(self, obj)
   File /usr/lib/python2.7/pickle.py, line 224, in dump
 self.save(obj)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
 save(element)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 193, in save_function
 self.save_function_tuple(obj)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 236, in save_function_tuple
 save((code, closure, base_globals))
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
 save(element)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))
   File /usr/lib/python2.7/pickle.py, line 

  1   2   3   4   >