Re: repartition in df vs partitionBy in df

2019-04-24 Thread moqi
Hello, There is another link here that I hope will help you.

https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby

In particular, when you are faced with possible data skew or have some
partitioned parameters that need to be obtained at runtime, you can refer to
this link and hope to help you.

https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: repartition in df vs partitionBy in df

2019-04-24 Thread moqi
Hello, there is another link to discuss the difference between the two
methods.

Https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby

In particular, when you are faced with possible data skew or have some
partitioned parameters that need to be obtained at runtime, you can refer to
this link and hope to help you.

Https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: repartition in df vs partitionBy in df

2019-04-24 Thread rajat kumar
hello,
thanks for quick reply.
got it . partitionBy is to create something like hive partitions.
but when do we use repartition actually?
how to decide whether to do repartition or not?
because in development we are getting sample data.
also what number should I give while repartition.

thanks

On Thu, 25 Apr 2019, 10:31 moqi  Hello, I think you can refer to this link and hope to help you.
>
>
> https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: repartition in df vs partitionBy in df

2019-04-24 Thread moqi
Hello, I think you can refer to this link and hope to help you.

https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: repartition in df vs partitionBy in df

2019-04-24 Thread rajat kumar
Hi All,
Can anyone explain?

thanks
rajat

On Sun, 21 Apr 2019, 00:18 kumar.rajat20del  Hi Spark Users,
>
> repartition and partitionBy seems to be very same in Df.
> In which scenario we use one?
>
> As per my understanding repartition is very expensive operation as it needs
> full shuffle then when do we use repartition ?
>
> Thanks
> Rajat
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [pyspark] Use output of one aggregated function for another aggregated function within the same groupby

2019-04-24 Thread Georg Heiler
Is analytical window funktions to rank the result and then filter to the
desired rank.

Rishi Shah  schrieb am Do. 25. Apr. 2019 um 05:07:

> Hi All,
>
> [PySpark 2.3, python 2.7]
>
> I would like to achieve something like this, could you please suggest best
> way to implement (perhaps highlight pros & cons of the approach in terms of
> performance)?
>
> df = df.groupby('grp_col').agg(max(date).alias('max_date'), count(when
> col('file_date') == col('max_date')))
>
> Please note 'max_date' is a result of aggregate function max inside the
> group by agg. I can definitely use multiple groupbys to achieve this but is
> there a better way? better performance wise may be?
>
> Appreciate your help!
>
>
> --
> Regards,
>
> Rishi Shah
>


[pyspark] Use output of one aggregated function for another aggregated function within the same groupby

2019-04-24 Thread Rishi Shah
Hi All,

[PySpark 2.3, python 2.7]

I would like to achieve something like this, could you please suggest best
way to implement (perhaps highlight pros & cons of the approach in terms of
performance)?

df = df.groupby('grp_col').agg(max(date).alias('max_date'), count(when
col('file_date') == col('max_date')))

Please note 'max_date' is a result of aggregate function max inside the
group by agg. I can definitely use multiple groupbys to achieve this but is
there a better way? better performance wise may be?

Appreciate your help!

-- 
Regards,

Rishi Shah


RDD vs Dataframe & when to persist

2019-04-24 Thread Rishi Shah
Hello All,

I run into situations where I ask myself should I write map partitions
function on RDD or use dataframe all the way (with column + group by )
approach.. I am using Pyspark 2.3 (python 2.7).. I understand we should be
utilizing dataframe as much as possible but at time it feels like RDD
function would provide more flexible code .. Could you please advise? When
to prefer one approach over the other.. (keeping pandas UDFs functions in
mind, which approach makes more sense in what scenarios? )

Also how does it affect performance - that is using dataframe all the way
vs RDD map partitions function?

another question always arrises as to when to persist a dataframe? should
we repartition before group by? If so, without persist - will it affect
performance?

Any help is much appreciated.

Thanks,
-Rishi


Re: Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Jeff Evans
Thanks for the pointers.  We figured out the stdout/stderr capture
piece.  I was just looking to capture the full command in order to
help debug issues we run into with the submit depending on various
combinations of all parameters/classpath, and also to isolate job
specific issues from our wrapping application (i.e. being able to
submit the job directly, rather than through our app).  I will use the
environment variable method for now.

On Wed, Apr 24, 2019 at 4:18 PM Marcelo Vanzin  wrote:
>
> BTW the SparkLauncher API has hooks to capture the stderr of the
> spark-submit process into the logging system of the parent process.
> Check the API javadocs since it's been forever since I looked at that.
>
> On Wed, Apr 24, 2019 at 1:58 PM Marcelo Vanzin  wrote:
> >
> > Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the
> > launcher env will make Spark code print the command to stderr. Not
> > optimal but I think it's the only current option.
> >
> > On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans
> >  wrote:
> > >
> > > The org.apache.spark.launcher.SparkLauncher is used to construct a
> > > spark-submit invocation programmatically, via a builder pattern.  In
> > > our application, which uses a SparkLauncher internally, I would like
> > > to log the full spark-submit command that it will invoke to our log
> > > file, in order to aid in debugging/support.  However, I can't figure
> > > out a way to do this.  This snippet would work, except for the fact
> > > that the createBuilder method is private.
> > >
> > > sparkLauncher.createBuilder().command()
> > >
> > > Is there an alternate way of doing this?  The Spark version is
> > > 2.11:2.4.0.  Thanks.
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> >
> >
> > --
> > Marcelo
>
>
>
> --
> Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Marcelo Vanzin
BTW the SparkLauncher API has hooks to capture the stderr of the
spark-submit process into the logging system of the parent process.
Check the API javadocs since it's been forever since I looked at that.

On Wed, Apr 24, 2019 at 1:58 PM Marcelo Vanzin  wrote:
>
> Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the
> launcher env will make Spark code print the command to stderr. Not
> optimal but I think it's the only current option.
>
> On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans
>  wrote:
> >
> > The org.apache.spark.launcher.SparkLauncher is used to construct a
> > spark-submit invocation programmatically, via a builder pattern.  In
> > our application, which uses a SparkLauncher internally, I would like
> > to log the full spark-submit command that it will invoke to our log
> > file, in order to aid in debugging/support.  However, I can't figure
> > out a way to do this.  This snippet would work, except for the fact
> > that the createBuilder method is private.
> >
> > sparkLauncher.createBuilder().command()
> >
> > Is there an alternate way of doing this?  The Spark version is
> > 2.11:2.4.0.  Thanks.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> --
> Marcelo



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Sebastian Piu
You could set the env var   SPARK_PRINT_LAUNCH_COMMAND and spark-submit
will print it, but it will be printed by the subprocess and not yours
unless you redirect the stdout
Also the command is what spark-submit generates, so it is quite more
verbose and includes the classpath etc.

I think the only alternative if the above is not enough is to get hold of
the builder - you might need to extend the launcher and push it on the same
package since

On Wed, 24 Apr 2019 at 21:55, Jeff Evans 
wrote:

> The org.apache.spark.launcher.SparkLauncher is used to construct a
> spark-submit invocation programmatically, via a builder pattern.  In
> our application, which uses a SparkLauncher internally, I would like
> to log the full spark-submit command that it will invoke to our log
> file, in order to aid in debugging/support.  However, I can't figure
> out a way to do this.  This snippet would work, except for the fact
> that the createBuilder method is private.
>
> sparkLauncher.createBuilder().command()
>
> Is there an alternate way of doing this?  The Spark version is
> 2.11:2.4.0.  Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Marcelo Vanzin
Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the
launcher env will make Spark code print the command to stderr. Not
optimal but I think it's the only current option.

On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans
 wrote:
>
> The org.apache.spark.launcher.SparkLauncher is used to construct a
> spark-submit invocation programmatically, via a builder pattern.  In
> our application, which uses a SparkLauncher internally, I would like
> to log the full spark-submit command that it will invoke to our log
> file, in order to aid in debugging/support.  However, I can't figure
> out a way to do this.  This snippet would work, except for the fact
> that the createBuilder method is private.
>
> sparkLauncher.createBuilder().command()
>
> Is there an alternate way of doing this?  The Spark version is
> 2.11:2.4.0.  Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Jeff Evans
The org.apache.spark.launcher.SparkLauncher is used to construct a
spark-submit invocation programmatically, via a builder pattern.  In
our application, which uses a SparkLauncher internally, I would like
to log the full spark-submit command that it will invoke to our log
file, in order to aid in debugging/support.  However, I can't figure
out a way to do this.  This snippet would work, except for the fact
that the createBuilder method is private.

sparkLauncher.createBuilder().command()

Is there an alternate way of doing this?  The Spark version is
2.11:2.4.0.  Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Wenchen Fan
Did you re-create your df when you update the timezone conf?

On Wed, Apr 24, 2019 at 9:18 PM Shubham Chaurasia 
wrote:

> Writing:
> scala> df.write.orc("")
>
> For looking into contents, I used orc-tools-X.Y.Z-uber.jar (
> https://orc.apache.org/docs/java-tools.html)
>
> On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan  wrote:
>
>> How did you read/write the timestamp value from/to ORC file?
>>
>> On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia <
>> shubh.chaura...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Consider the following(spark v2.4.0):
>>>
>>> Basically I change values of `spark.sql.session.timeZone` and perform an
>>> orc write. Here are 3 samples:-
>>>
>>> 1)
>>> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>>>
>>> scala> val df = sc.parallelize(Seq("2019-04-23
>>> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
>>> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>>>
>>> df.show() Output  ORC File Contents
>>> -
>>> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>>>
>>> 2)
>>> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>>>
>>> df.show() Output  ORC File Contents
>>> -
>>> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>>
>>> 3)
>>> scala> spark.conf.set("spark.sql.session.timeZone",
>>> "America/Los_Angeles")
>>>
>>> df.show() Output  ORC File Contents
>>> -
>>> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>>
>>> It can be seen that in all the three cases it stores {"ts":"2019-04-23
>>> 09:15:04.0"} in orc file. I understand that orc file also contains writer
>>> timezone with respect to which spark is able to convert back to actual time
>>> when it reads orc.(and that is equal to df.show())
>>>
>>> But it's problematic in the sense that it is not adjusting(plus/minus)
>>> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
>>> 09:15:04.0"} in ORC file. I mean loading data to any system other than
>>> spark would be a problem.
>>>
>>> Any ideas/suggestions on that?
>>>
>>> PS: For csv files, it stores exactly what we see as the output of
>>> df.show()
>>>
>>> Thanks,
>>> Shubham
>>>
>>>


'No plan for EventTimeWatermark' error while using structured streaming with column pruning (spark 2.3.1)

2019-04-24 Thread kineret M
Hi All,

I get 'No plan for EventTimeWatermark' error while doing a query with
columns pruning using structured streaming with a custom data source that
implements Spark datasource v2.

My data source implementation that handles the schemas includes the
following:

class MyDataSourceReader extends DataSourceReader with
SupportsPushDownRequiredColumns {
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}

and then:

class MyDataSourceReaderStream extends MyDataSourceReader { ...

This is my test code:

def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 *
1000)) * 30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }

I get the following exception:

Caused by: java.lang.AssertionError: assertion failed: No plan for
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) *
30.0) as timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L],
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

Note that in the logical plan I got *DataSourceV2Relation* and not
*StreamingDataSourceV2Relation*although I use streaming.

Would appreciate the help.


Handle Null Columns in Spark Structured Streaming Kafka

2019-04-24 Thread SNEHASISH DUTTA
Hi,

While writing to kafka using spark structured streaming , if all the values
in certain column are Null it gets dropped
Is there any way to override this , other than using na.fill functions

Regards,
Snehasish


Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Writing:
scala> df.write.orc("")

For looking into contents, I used orc-tools-X.Y.Z-uber.jar (
https://orc.apache.org/docs/java-tools.html)

On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan  wrote:

> How did you read/write the timestamp value from/to ORC file?
>
> On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia <
> shubh.chaura...@gmail.com> wrote:
>
>> Hi All,
>>
>> Consider the following(spark v2.4.0):
>>
>> Basically I change values of `spark.sql.session.timeZone` and perform an
>> orc write. Here are 3 samples:-
>>
>> 1)
>> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>>
>> scala> val df = sc.parallelize(Seq("2019-04-23
>> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
>> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> 2)
>> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> 3)
>> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> It can be seen that in all the three cases it stores {"ts":"2019-04-23
>> 09:15:04.0"} in orc file. I understand that orc file also contains writer
>> timezone with respect to which spark is able to convert back to actual time
>> when it reads orc.(and that is equal to df.show())
>>
>> But it's problematic in the sense that it is not adjusting(plus/minus)
>> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
>> 09:15:04.0"} in ORC file. I mean loading data to any system other than
>> spark would be a problem.
>>
>> Any ideas/suggestions on that?
>>
>> PS: For csv files, it stores exactly what we see as the output of
>> df.show()
>>
>> Thanks,
>> Shubham
>>
>>


Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Wenchen Fan
How did you read/write the timestamp value from/to ORC file?

On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia 
wrote:

> Hi All,
>
> Consider the following(spark v2.4.0):
>
> Basically I change values of `spark.sql.session.timeZone` and perform an
> orc write. Here are 3 samples:-
>
> 1)
> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>
> scala> val df = sc.parallelize(Seq("2019-04-23
> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>
> df.show() Output  ORC File Contents
> -
> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>
> 2)
> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>
> df.show() Output  ORC File Contents
> -
> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>
> 3)
> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>
> df.show() Output  ORC File Contents
> -
> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>
> It can be seen that in all the three cases it stores {"ts":"2019-04-23
> 09:15:04.0"} in orc file. I understand that orc file also contains writer
> timezone with respect to which spark is able to convert back to actual time
> when it reads orc.(and that is equal to df.show())
>
> But it's problematic in the sense that it is not adjusting(plus/minus)
> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
> 09:15:04.0"} in ORC file. I mean loading data to any system other than
> spark would be a problem.
>
> Any ideas/suggestions on that?
>
> PS: For csv files, it stores exactly what we see as the output of df.show()
>
> Thanks,
> Shubham
>
>


DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Hi All,

Consider the following(spark v2.4.0):

Basically I change values of `spark.sql.session.timeZone` and perform an
orc write. Here are 3 samples:-

1)
scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

scala> val df = sc.parallelize(Seq("2019-04-23
09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

df.show() Output  ORC File Contents
-
2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}

2)
scala> spark.conf.set("spark.sql.session.timeZone", "UTC")

df.show() Output  ORC File Contents
-
2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}

3)
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

df.show() Output  ORC File Contents
-
2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}

It can be seen that in all the three cases it stores {"ts":"2019-04-23
09:15:04.0"} in orc file. I understand that orc file also contains writer
timezone with respect to which spark is able to convert back to actual time
when it reads orc.(and that is equal to df.show())

But it's problematic in the sense that it is not adjusting(plus/minus)
timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
09:15:04.0"} in ORC file. I mean loading data to any system other than
spark would be a problem.

Any ideas/suggestions on that?

PS: For csv files, it stores exactly what we see as the output of df.show()

Thanks,
Shubham


Fwd: autoBroadcastJoinThreshold not working as expected

2019-04-24 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, 

Handle empty partitions in pyspark

2019-04-24 Thread kanchan tewary
Hi All,

I have a situation where the rdd is having some empty partitions, which I
would like to identify and handle while applying mapPartitions or similar
functions. Is there a way to do this in pyspark? The method isEmpty works
on the rdd only and can not be applied.

Much appreciated.

Code block:

`list1 = [1,2,3,3,6,7,8,12,6,23,45,76,9,10]

r1 = sc.parallelize(list1,20)

def adder(iterator):
if iterator.isEmpty():
yield 'None'
else:
yield sum(iterator)

print(r1.mapPartitions(adder).collect())`

Thanks & Best Regards,
Kanchan
Data Engineer, IBM


spark stddev() giving '?' as output how to handle it ? i.e replace null/0

2019-04-24 Thread Shyam P
https://stackoverflow.com/questions/55823608/how-to-handle-spark-stddev-function-output-value-when-there-there-is-no-data


Regards,
Shyam