Re: Profiling PySpark Pandas UDF

2022-08-25 Thread Takuya UESHIN
Hi Subash,

Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
-
https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf

Hope it can help you.

Thanks.

On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney 
wrote:

> Subash, I’m here to help :)
>
> I started a test script to demonstrate a solution last night but got a
> cold and haven’t finished it. Give me another day and I’ll get it to you.
> My suggestion is that you run PySpark locally in pytest with a fixture to
> generate and yield your SparckContext and SparkSession and the. Write tests
> that load some test data, perform some count operation and checkpoint to
> ensure that data is loaded, start a timer, run your UDF on the DataFrame,
> checkpoint again or write some output to disk to make sure it finishes and
> then stop the timer and compute how long it takes. I’ll show you some code,
> I have to do this for Graphlet AI’s RTL utils and other tools to figure out
> how much overhead there is using Pandera and Spark together to validate
> data: https://github.com/Graphlet-AI/graphlet
>
> I’ll respond by tomorrow evening with code in a fist! We’ll see if it gets
> consistent, measurable and valid results! :)
>
> Russell Jurney
>
> On Thu, Aug 25, 2022 at 10:00 AM Sean Owen  wrote:
>
>> It's important to realize that while pandas UDFs and pandas on Spark are
>> both related to pandas, they are not themselves directly related. The first
>> lets you use pandas within Spark, the second lets you use pandas on Spark.
>>
>> Hard to say with this info but you want to look at whether you are doing
>> something expensive in each UDF call and consider amortizing it with the
>> scalar iterator UDF pattern. Maybe.
>>
>> A pandas UDF is not spark code itself so no there is no tool in spark to
>> profile it. Conversely any approach to profiling pandas or python would
>> work here .
>>
>> On Thu, Aug 25, 2022, 11:22 AM Gourav Sengupta 
>> wrote:
>>
>>> Hi,
>>>
>>> May be I am jumping to conclusions and making stupid guesses, but have
>>> you tried koalas now that it is natively integrated with pyspark??
>>>
>>> Regards
>>> Gourav
>>>
>>> On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, <
>>> subashpraba...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I was wondering if we have any best practices on using pandas UDF ?
>>>> Profiling UDF is not an easy task and our case requires some drilling down
>>>> on the logic of the function.
>>>>
>>>>
>>>> Our use case:
>>>> We are using func(Dataframe) => Dataframe as interface to use Pandas
>>>> UDF, while running locally only the function, it runs faster but when
>>>> executed in Spark environment - the processing time is more than expected.
>>>> We have one column where the value is large (BinaryType -> 600KB),
>>>> wondering whether this could make the Arrow computation slower ?
>>>>
>>>> Is there any profiling or best way to debug the cost incurred using
>>>> pandas UDF ?
>>>>
>>>>
>>>> Thanks,
>>>> Subash
>>>>
>>>> --
>
> Thanks,
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>


-- 
Takuya UESHIN


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-07 Thread Takuya UESHIN
+1

On Thu, Nov 7, 2019 at 6:54 PM Shane Knapp  wrote:

> +1
>
> On Thu, Nov 7, 2019 at 6:08 PM Hyukjin Kwon  wrote:
> >
> > +1
> >
> > 2019년 11월 6일 (수) 오후 11:38, Wenchen Fan 님이 작성:
> >>
> >> Sounds reasonable to me. We should make the behavior consistent within
> Spark.
> >>
> >> On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:
> >>>
> >>> Currently, when a PySpark Row is created with keyword arguments, the
> fields are sorted alphabetically. This has created a lot of confusion with
> users because it is not obvious (although it is stated in the pydocs) that
> they will be sorted alphabetically. Then later when applying a schema and
> the field order does not match, an error will occur. Here is a list of some
> of the JIRAs that I have been tracking all related to this issue:
> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
> of the issue [1].
> >>>
> >>> The original reason for sorting fields is because kwargs in python <
> 3.6 are not guaranteed to be in the same order that they were entered [2].
> Sorting alphabetically ensures a consistent order. Matters are further
> complicated with the flag _from_dict_ that allows the Row fields to to be
> referenced by name when made by kwargs, but this flag is not serialized
> with the Row and leads to inconsistent behavior. For instance:
> >>>
> >>> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A
> string").first()
> >>> Row(B='2', A='1')
> >>> >>> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1",
> B="2")]), "B string, A string").first()
> >>> Row(B='1', A='2')
> >>>
> >>> I think the best way to fix this is to remove the sorting of fields
> when constructing a Row. For users with Python 3.6+, nothing would change
> because these versions of Python ensure that the kwargs stays in the
> ordered entered. For users with Python < 3.6, using kwargs would check a
> conf to either raise an error or fallback to a LegacyRow that sorts the
> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
> can also be removed at the same time. There are also other ways to create
> Rows that will not be affected. I have opened a JIRA [3] to capture this,
> but I am wondering what others think about fixing this for Spark 3.0?
> >>>
> >>> [1] https://github.com/apache/spark/pull/20280
> >>> [2] https://www.python.org/dev/peps/pep-0468/
> >>> [3] https://issues.apache.org/jira/browse/SPARK-29748
>
>
>
> --
> Shane Knapp
> UC Berkeley EECS Research / RISELab Staff Technical Lead
> https://rise.cs.berkeley.edu
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: array_contains in package org.apache.spark.sql.functions

2018-06-14 Thread Takuya UESHIN
Hi Chongguang,

Thanks for the report!

That makes sense and the proposition should work, or we can add something
like `def array_contains(column: Column, value: Column)`.
Maybe other functions, such as `array_position`, `element_at`, are the same
situation.

Could you file a JIRA, and submit a PR if possible?
We can have a discussion more about the issue there.

Btw, I guess you can use `expr("array_contains(columnA, columnB)")` as a
workaround.

Thanks.


On Thu, Jun 14, 2018 at 2:15 AM, 刘崇光  wrote:

>
> -- Forwarded message --
> From: 刘崇光 
> Date: Thu, Jun 14, 2018 at 11:08 AM
> Subject: array_contains in package org.apache.spark.sql.functions
> To: user@spark.apache.org
>
>
> Hello all,
>
> I ran into a use case in project with spark sql and want to share with you
> some thoughts about the function array_contains.
>
> Say I have a Dataframe containing 2 columns. Column A of type "Array of
> String" and Column B of type "String". I want to determine if the value of
> column B is contained in the value of column A, without using a udf of
> course.
> The function array_contains came into my mind naturally:
>
> def array_contains(column: Column, value: Any): Column = withExpr {
>   ArrayContains(column.expr, Literal(value))
> }
>
> However the function takes the column B and does a "Literal" of column B,
> which yields a runtime exception: RuntimeException("Unsupported literal
> type " + v.getClass + " " + v).
>
> Then after discussion with my friends, we fund a solution without using
> udf:
>
> new Column(ArrayContains(df("ColumnA").expr, df("ColumnB").expr)
>
>
> With this solution, I think of empowering a little bit more the function,
> by doing like this:
>
> def array_contains(column: Column, value: Any): Column = withExpr {
>   value match {
> case c: Column => ArrayContains(column.expr, c.expr)
> case _ => ArrayContains(column.expr, Literal(value))
>   }
> }
>
>
> It does a pattern matching to detect if value is of type Column. If yes,
> it will use the .expr of the column, otherwise it will work as it used to.
>
> Any suggestion or opinion on the proposition?
>
>
> Kind regards,
> Chongguang LIU
>
>


-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: How to insert complex types like mapstring,mapstring,int in spark sql

2014-11-26 Thread Takuya UESHIN
Hi,
I guess this is fixed by https://github.com/apache/spark/pull/3110
which is not for complex type casting but makes inserting into hive
table be able to handle complex types ignoring nullability.

I also sent a pull-request (https://github.com/apache/spark/pull/3150)
for complex type casting before.
Please check it if #3110 doesn't work.

Thanks.


2014-11-25 18:37 GMT+09:00 Cheng Lian lian.cs@gmail.com:
 Spark SQL supports complex types, but casting doesn't work for complex types
 right now.


 On 11/25/14 4:04 PM, critikaled wrote:


 https://github.com/apache/spark/blob/84d79ee9ec47465269f7b0a7971176da93c96f3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

 Doesn't look like spark sql support nested complex types right now



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-complex-types-like-map-string-map-string-int-in-spark-sql-tp19603p19730.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL - Language Integrated query - OR clause and IN clause

2014-07-10 Thread Takuya UESHIN
Hi Prem,

You can write like:

  people.where('age = 10  'month === 2).select('name)
  people.where('age = 10 || 'month === 2).select('name)
  people.where(In('month, Seq(2, 6))).select('name)

The last one needs to import catalyst.expressions package.


Thanks.


2014-07-10 22:15 GMT+09:00 premdass premdas...@yahoo.co.in:
 Hi,

 any suggestions on how to implement OR clause and IN clause in the SparkSQL
 language integrated queries.

 For example:

  'SELECT name FROM people WHERE age = 10 AND month = 2' can be written as
 val teenagers = people.where('age = 10).where('month === 2).select('name)

 How do we write  'SELECT name FROM people WHERE age = 10 OR month = 2' ?

 Also how do we write  'SELECT name FROM people WHERE month in ( 2,6)' ??

 Any suggestions will be greatly appreciated.

 Thanks,
 Prem



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Language-Integrated-query-OR-clause-and-IN-clause-tp9298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: Spark SQL user defined functions

2014-07-04 Thread Takuya UESHIN
Hi,

You can convert standard RDD of Product class (e.g. case class) to
SchemaRDD by SQLContext.
Load data from Cassandra into RDD of case class, convert it to SchemaRDD
and register it,
then you can use it in your SQLs.

http://spark.apache.org/docs/latest/sql-programming-guide.html#running-sql-on-rdds

Thanks.



2014-07-04 17:59 GMT+09:00 Martin Gammelsæter martingammelsae...@gmail.com
:

 Hi!

 I have a Spark cluster running on top of a Cassandra cluster, using
 Datastax' new driver, and one of the fields of my RDDs is an
 XML-string. In a normal Scala sparkjob, parsing that data is no
 problem, but I would like to also make that information available
 through Spark SQL. So, is there any way to write user defined
 functions for Spark SQL? I know that a HiveContext is available, but I
 understand that that is for querying data from Hive, and I don't have
 Hive in my stack (please correct me if I'm wrong).

 I would love to be able to do something like the following:

 val casRdd = sparkCtx.cassandraTable(ks, cf)

 // registerAsTable etc

 val res = sql(SELECT id, xmlGetTag(xmlfield, 'sometag') FROM cf)

 --
 Best regards,
 Martin Gammelsæter




-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: Spark SQL user defined functions

2014-07-04 Thread Takuya UESHIN
Ah, sorry for misreading.

I don't think there is a way to use UDF in your SQLs only with SparkSQL.
You might be able to use with SparkHive, but I'm sorry, I don't know well.

I think you should use the function before convert to SchemaRDD if you can.

Thanks.




2014-07-04 18:16 GMT+09:00 Martin Gammelsæter martingammelsae...@gmail.com
:

 Takuya, thanks for your reply :)
 I am already doing that, and it is working well. My question is, can I
 define arbitrary functions to be used in these queries?

 On Fri, Jul 4, 2014 at 11:12 AM, Takuya UESHIN ues...@happy-camper.st
 wrote:
  Hi,
 
  You can convert standard RDD of Product class (e.g. case class) to
 SchemaRDD
  by SQLContext.
  Load data from Cassandra into RDD of case class, convert it to SchemaRDD
 and
  register it,
  then you can use it in your SQLs.
 
 
 http://spark.apache.org/docs/latest/sql-programming-guide.html#running-sql-on-rdds
 
  Thanks.
 
 
 
  2014-07-04 17:59 GMT+09:00 Martin Gammelsæter
  martingammelsae...@gmail.com:
 
  Hi!
 
  I have a Spark cluster running on top of a Cassandra cluster, using
  Datastax' new driver, and one of the fields of my RDDs is an
  XML-string. In a normal Scala sparkjob, parsing that data is no
  problem, but I would like to also make that information available
  through Spark SQL. So, is there any way to write user defined
  functions for Spark SQL? I know that a HiveContext is available, but I
  understand that that is for querying data from Hive, and I don't have
  Hive in my stack (please correct me if I'm wrong).
 
  I would love to be able to do something like the following:
 
  val casRdd = sparkCtx.cassandraTable(ks, cf)
 
  // registerAsTable etc
 
  val res = sql(SELECT id, xmlGetTag(xmlfield, 'sometag') FROM cf)
 
  --
  Best regards,
  Martin Gammelsæter
 
 
 
 
  --
  Takuya UESHIN
  Tokyo, Japan
 
  http://twitter.com/ueshin



 --
 Mvh.
 Martin Gammelsæter
 92209139




-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: Spark SQL - groupby

2014-07-03 Thread Takuya UESHIN
Hi,

You need to import Sum and Count like:

import org.apache.spark.sql.catalyst.expressions.{Sum,Count} // or
with wildcard _

or if you use current master branch build, you can use sum('colB)
instead of Sum('colB).

Thanks.



2014-07-03 16:09 GMT+09:00 Subacini B subac...@gmail.com:
 Hi,

 Can someone provide me pointers for this issue.

 Thanks
 Subacini


 On Wed, Jul 2, 2014 at 3:34 PM, Subacini B subac...@gmail.com wrote:

 Hi,

 Below code throws  compilation error , not found: value Sum . Can
 someone help me on this. Do i need to add any jars or imports ? even for
 Count , same error is thrown

 val queryResult = sql(select * from Table)
  queryResult.groupBy('colA)('colA,Sum('colB) as
 'totB).aggregate(Sum('totB)).collect().foreach(println)

 Thanks
 subacini





-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin