Re: repartition in df vs partitionBy in df
Hello, There is another link here that I hope will help you. https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby In particular, when you are faced with possible data skew or have some partitioned parameters that need to be obtained at runtime, you can refer to this link and hope to help you. https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: repartition in df vs partitionBy in df
Hello, there is another link to discuss the difference between the two methods. Https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby In particular, when you are faced with possible data skew or have some partitioned parameters that need to be obtained at runtime, you can refer to this link and hope to help you. Https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: repartition in df vs partitionBy in df
hello, thanks for quick reply. got it . partitionBy is to create something like hive partitions. but when do we use repartition actually? how to decide whether to do repartition or not? because in development we are getting sample data. also what number should I give while repartition. thanks On Thu, 25 Apr 2019, 10:31 moqi Hello, I think you can refer to this link and hope to help you. > > > https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992 > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: repartition in df vs partitionBy in df
Hello, I think you can refer to this link and hope to help you. https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992 -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: repartition in df vs partitionBy in df
Hi All, Can anyone explain? thanks rajat On Sun, 21 Apr 2019, 00:18 kumar.rajat20del Hi Spark Users, > > repartition and partitionBy seems to be very same in Df. > In which scenario we use one? > > As per my understanding repartition is very expensive operation as it needs > full shuffle then when do we use repartition ? > > Thanks > Rajat > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: [pyspark] Use output of one aggregated function for another aggregated function within the same groupby
Is analytical window funktions to rank the result and then filter to the desired rank. Rishi Shah schrieb am Do. 25. Apr. 2019 um 05:07: > Hi All, > > [PySpark 2.3, python 2.7] > > I would like to achieve something like this, could you please suggest best > way to implement (perhaps highlight pros & cons of the approach in terms of > performance)? > > df = df.groupby('grp_col').agg(max(date).alias('max_date'), count(when > col('file_date') == col('max_date'))) > > Please note 'max_date' is a result of aggregate function max inside the > group by agg. I can definitely use multiple groupbys to achieve this but is > there a better way? better performance wise may be? > > Appreciate your help! > > > -- > Regards, > > Rishi Shah >
[pyspark] Use output of one aggregated function for another aggregated function within the same groupby
Hi All, [PySpark 2.3, python 2.7] I would like to achieve something like this, could you please suggest best way to implement (perhaps highlight pros & cons of the approach in terms of performance)? df = df.groupby('grp_col').agg(max(date).alias('max_date'), count(when col('file_date') == col('max_date'))) Please note 'max_date' is a result of aggregate function max inside the group by agg. I can definitely use multiple groupbys to achieve this but is there a better way? better performance wise may be? Appreciate your help! -- Regards, Rishi Shah
RDD vs Dataframe & when to persist
Hello All, I run into situations where I ask myself should I write map partitions function on RDD or use dataframe all the way (with column + group by ) approach.. I am using Pyspark 2.3 (python 2.7).. I understand we should be utilizing dataframe as much as possible but at time it feels like RDD function would provide more flexible code .. Could you please advise? When to prefer one approach over the other.. (keeping pandas UDFs functions in mind, which approach makes more sense in what scenarios? ) Also how does it affect performance - that is using dataframe all the way vs RDD map partitions function? another question always arrises as to when to persist a dataframe? should we repartition before group by? If so, without persist - will it affect performance? Any help is much appreciated. Thanks, -Rishi
Re: Is it possible to obtain the full command to be invoked by SparkLauncher?
Thanks for the pointers. We figured out the stdout/stderr capture piece. I was just looking to capture the full command in order to help debug issues we run into with the submit depending on various combinations of all parameters/classpath, and also to isolate job specific issues from our wrapping application (i.e. being able to submit the job directly, rather than through our app). I will use the environment variable method for now. On Wed, Apr 24, 2019 at 4:18 PM Marcelo Vanzin wrote: > > BTW the SparkLauncher API has hooks to capture the stderr of the > spark-submit process into the logging system of the parent process. > Check the API javadocs since it's been forever since I looked at that. > > On Wed, Apr 24, 2019 at 1:58 PM Marcelo Vanzin wrote: > > > > Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the > > launcher env will make Spark code print the command to stderr. Not > > optimal but I think it's the only current option. > > > > On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans > > wrote: > > > > > > The org.apache.spark.launcher.SparkLauncher is used to construct a > > > spark-submit invocation programmatically, via a builder pattern. In > > > our application, which uses a SparkLauncher internally, I would like > > > to log the full spark-submit command that it will invoke to our log > > > file, in order to aid in debugging/support. However, I can't figure > > > out a way to do this. This snippet would work, except for the fact > > > that the createBuilder method is private. > > > > > > sparkLauncher.createBuilder().command() > > > > > > Is there an alternate way of doing this? The Spark version is > > > 2.11:2.4.0. Thanks. > > > > > > - > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > > -- > > Marcelo > > > > -- > Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Is it possible to obtain the full command to be invoked by SparkLauncher?
BTW the SparkLauncher API has hooks to capture the stderr of the spark-submit process into the logging system of the parent process. Check the API javadocs since it's been forever since I looked at that. On Wed, Apr 24, 2019 at 1:58 PM Marcelo Vanzin wrote: > > Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the > launcher env will make Spark code print the command to stderr. Not > optimal but I think it's the only current option. > > On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans > wrote: > > > > The org.apache.spark.launcher.SparkLauncher is used to construct a > > spark-submit invocation programmatically, via a builder pattern. In > > our application, which uses a SparkLauncher internally, I would like > > to log the full spark-submit command that it will invoke to our log > > file, in order to aid in debugging/support. However, I can't figure > > out a way to do this. This snippet would work, except for the fact > > that the createBuilder method is private. > > > > sparkLauncher.createBuilder().command() > > > > Is there an alternate way of doing this? The Spark version is > > 2.11:2.4.0. Thanks. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > -- > Marcelo -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Is it possible to obtain the full command to be invoked by SparkLauncher?
You could set the env var SPARK_PRINT_LAUNCH_COMMAND and spark-submit will print it, but it will be printed by the subprocess and not yours unless you redirect the stdout Also the command is what spark-submit generates, so it is quite more verbose and includes the classpath etc. I think the only alternative if the above is not enough is to get hold of the builder - you might need to extend the launcher and push it on the same package since On Wed, 24 Apr 2019 at 21:55, Jeff Evans wrote: > The org.apache.spark.launcher.SparkLauncher is used to construct a > spark-submit invocation programmatically, via a builder pattern. In > our application, which uses a SparkLauncher internally, I would like > to log the full spark-submit command that it will invoke to our log > file, in order to aid in debugging/support. However, I can't figure > out a way to do this. This snippet would work, except for the fact > that the createBuilder method is private. > > sparkLauncher.createBuilder().command() > > Is there an alternate way of doing this? The Spark version is > 2.11:2.4.0. Thanks. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Is it possible to obtain the full command to be invoked by SparkLauncher?
Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the launcher env will make Spark code print the command to stderr. Not optimal but I think it's the only current option. On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans wrote: > > The org.apache.spark.launcher.SparkLauncher is used to construct a > spark-submit invocation programmatically, via a builder pattern. In > our application, which uses a SparkLauncher internally, I would like > to log the full spark-submit command that it will invoke to our log > file, in order to aid in debugging/support. However, I can't figure > out a way to do this. This snippet would work, except for the fact > that the createBuilder method is private. > > sparkLauncher.createBuilder().command() > > Is there an alternate way of doing this? The Spark version is > 2.11:2.4.0. Thanks. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Is it possible to obtain the full command to be invoked by SparkLauncher?
The org.apache.spark.launcher.SparkLauncher is used to construct a spark-submit invocation programmatically, via a builder pattern. In our application, which uses a SparkLauncher internally, I would like to log the full spark-submit command that it will invoke to our log file, in order to aid in debugging/support. However, I can't figure out a way to do this. This snippet would work, except for the fact that the createBuilder method is private. sparkLauncher.createBuilder().command() Is there an alternate way of doing this? The Spark version is 2.11:2.4.0. Thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files
Did you re-create your df when you update the timezone conf? On Wed, Apr 24, 2019 at 9:18 PM Shubham Chaurasia wrote: > Writing: > scala> df.write.orc("") > > For looking into contents, I used orc-tools-X.Y.Z-uber.jar ( > https://orc.apache.org/docs/java-tools.html) > > On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan wrote: > >> How did you read/write the timestamp value from/to ORC file? >> >> On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia < >> shubh.chaura...@gmail.com> wrote: >> >>> Hi All, >>> >>> Consider the following(spark v2.4.0): >>> >>> Basically I change values of `spark.sql.session.timeZone` and perform an >>> orc write. Here are 3 samples:- >>> >>> 1) >>> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") >>> >>> scala> val df = sc.parallelize(Seq("2019-04-23 >>> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp")) >>> df: org.apache.spark.sql.DataFrame = [ts: timestamp] >>> >>> df.show() Output ORC File Contents >>> - >>> 2019-04-23 09:15:04 {"ts":"2019-04-23 09:15:04.0"} >>> >>> 2) >>> scala> spark.conf.set("spark.sql.session.timeZone", "UTC") >>> >>> df.show() Output ORC File Contents >>> - >>> 2019-04-23 03:45:04 {"ts":"2019-04-23 09:15:04.0"} >>> >>> 3) >>> scala> spark.conf.set("spark.sql.session.timeZone", >>> "America/Los_Angeles") >>> >>> df.show() Output ORC File Contents >>> - >>> 2019-04-22 20:45:04 {"ts":"2019-04-23 09:15:04.0"} >>> >>> It can be seen that in all the three cases it stores {"ts":"2019-04-23 >>> 09:15:04.0"} in orc file. I understand that orc file also contains writer >>> timezone with respect to which spark is able to convert back to actual time >>> when it reads orc.(and that is equal to df.show()) >>> >>> But it's problematic in the sense that it is not adjusting(plus/minus) >>> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23 >>> 09:15:04.0"} in ORC file. I mean loading data to any system other than >>> spark would be a problem. >>> >>> Any ideas/suggestions on that? >>> >>> PS: For csv files, it stores exactly what we see as the output of >>> df.show() >>> >>> Thanks, >>> Shubham >>> >>>
'No plan for EventTimeWatermark' error while using structured streaming with column pruning (spark 2.3.1)
Hi All, I get 'No plan for EventTimeWatermark' error while doing a query with columns pruning using structured streaming with a custom data source that implements Spark datasource v2. My data source implementation that handles the schemas includes the following: class MyDataSourceReader extends DataSourceReader with SupportsPushDownRequiredColumns { var schema: StructType = createSchema() override def readSchema(): StructType = schema override def pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema } and then: class MyDataSourceReaderStream extends MyDataSourceReader { ... This is my test code: def x(): Unit = { val df1 = sparkSession.readStream.format(myV2Source).load() val df2 = df1 .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") .groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream .format("console") .trigger(Trigger.ProcessingTime("10 seconds")) .outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } I get the following exception: Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) Note that in the logical plan I got *DataSourceV2Relation* and not *StreamingDataSourceV2Relation*although I use streaming. Would appreciate the help.
Handle Null Columns in Spark Structured Streaming Kafka
Hi, While writing to kafka using spark structured streaming , if all the values in certain column are Null it gets dropped Is there any way to override this , other than using na.fill functions Regards, Snehasish
Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files
Writing: scala> df.write.orc("") For looking into contents, I used orc-tools-X.Y.Z-uber.jar ( https://orc.apache.org/docs/java-tools.html) On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan wrote: > How did you read/write the timestamp value from/to ORC file? > > On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia < > shubh.chaura...@gmail.com> wrote: > >> Hi All, >> >> Consider the following(spark v2.4.0): >> >> Basically I change values of `spark.sql.session.timeZone` and perform an >> orc write. Here are 3 samples:- >> >> 1) >> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") >> >> scala> val df = sc.parallelize(Seq("2019-04-23 >> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp")) >> df: org.apache.spark.sql.DataFrame = [ts: timestamp] >> >> df.show() Output ORC File Contents >> - >> 2019-04-23 09:15:04 {"ts":"2019-04-23 09:15:04.0"} >> >> 2) >> scala> spark.conf.set("spark.sql.session.timeZone", "UTC") >> >> df.show() Output ORC File Contents >> - >> 2019-04-23 03:45:04 {"ts":"2019-04-23 09:15:04.0"} >> >> 3) >> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >> >> df.show() Output ORC File Contents >> - >> 2019-04-22 20:45:04 {"ts":"2019-04-23 09:15:04.0"} >> >> It can be seen that in all the three cases it stores {"ts":"2019-04-23 >> 09:15:04.0"} in orc file. I understand that orc file also contains writer >> timezone with respect to which spark is able to convert back to actual time >> when it reads orc.(and that is equal to df.show()) >> >> But it's problematic in the sense that it is not adjusting(plus/minus) >> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23 >> 09:15:04.0"} in ORC file. I mean loading data to any system other than >> spark would be a problem. >> >> Any ideas/suggestions on that? >> >> PS: For csv files, it stores exactly what we see as the output of >> df.show() >> >> Thanks, >> Shubham >> >>
Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files
How did you read/write the timestamp value from/to ORC file? On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia wrote: > Hi All, > > Consider the following(spark v2.4.0): > > Basically I change values of `spark.sql.session.timeZone` and perform an > orc write. Here are 3 samples:- > > 1) > scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") > > scala> val df = sc.parallelize(Seq("2019-04-23 > 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp")) > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > > df.show() Output ORC File Contents > - > 2019-04-23 09:15:04 {"ts":"2019-04-23 09:15:04.0"} > > 2) > scala> spark.conf.set("spark.sql.session.timeZone", "UTC") > > df.show() Output ORC File Contents > - > 2019-04-23 03:45:04 {"ts":"2019-04-23 09:15:04.0"} > > 3) > scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") > > df.show() Output ORC File Contents > - > 2019-04-22 20:45:04 {"ts":"2019-04-23 09:15:04.0"} > > It can be seen that in all the three cases it stores {"ts":"2019-04-23 > 09:15:04.0"} in orc file. I understand that orc file also contains writer > timezone with respect to which spark is able to convert back to actual time > when it reads orc.(and that is equal to df.show()) > > But it's problematic in the sense that it is not adjusting(plus/minus) > timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23 > 09:15:04.0"} in ORC file. I mean loading data to any system other than > spark would be a problem. > > Any ideas/suggestions on that? > > PS: For csv files, it stores exactly what we see as the output of df.show() > > Thanks, > Shubham > >
DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files
Hi All, Consider the following(spark v2.4.0): Basically I change values of `spark.sql.session.timeZone` and perform an orc write. Here are 3 samples:- 1) scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") scala> val df = sc.parallelize(Seq("2019-04-23 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp")) df: org.apache.spark.sql.DataFrame = [ts: timestamp] df.show() Output ORC File Contents - 2019-04-23 09:15:04 {"ts":"2019-04-23 09:15:04.0"} 2) scala> spark.conf.set("spark.sql.session.timeZone", "UTC") df.show() Output ORC File Contents - 2019-04-23 03:45:04 {"ts":"2019-04-23 09:15:04.0"} 3) scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") df.show() Output ORC File Contents - 2019-04-22 20:45:04 {"ts":"2019-04-23 09:15:04.0"} It can be seen that in all the three cases it stores {"ts":"2019-04-23 09:15:04.0"} in orc file. I understand that orc file also contains writer timezone with respect to which spark is able to convert back to actual time when it reads orc.(and that is equal to df.show()) But it's problematic in the sense that it is not adjusting(plus/minus) timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23 09:15:04.0"} in ORC file. I mean loading data to any system other than spark would be a problem. Any ideas/suggestions on that? PS: For csv files, it stores exactly what we see as the output of df.show() Thanks, Shubham
Fwd: autoBroadcastJoinThreshold not working as expected
Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error. Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke == Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454,
Handle empty partitions in pyspark
Hi All, I have a situation where the rdd is having some empty partitions, which I would like to identify and handle while applying mapPartitions or similar functions. Is there a way to do this in pyspark? The method isEmpty works on the rdd only and can not be applied. Much appreciated. Code block: `list1 = [1,2,3,3,6,7,8,12,6,23,45,76,9,10] r1 = sc.parallelize(list1,20) def adder(iterator): if iterator.isEmpty(): yield 'None' else: yield sum(iterator) print(r1.mapPartitions(adder).collect())` Thanks & Best Regards, Kanchan Data Engineer, IBM
spark stddev() giving '?' as output how to handle it ? i.e replace null/0
https://stackoverflow.com/questions/55823608/how-to-handle-spark-stddev-function-output-value-when-there-there-is-no-data Regards, Shyam