Hi Liang-Chi Hsieh,

Strange:
As the "toCarry" returned is the following when I tested your codes:

Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
Some(FooBar(Some(2016-01-02),second)))
For me it always looked like:

###################### carry
Map(2 -> None, 5 -> None, 4 -> None, 7 ->
Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 ->
Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)),
6 -> None, 0 -> None)
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
()
###################### carry


I updated the code to contain a fixed default parallelism
.set("spark.default.parallelism", "12")

Also:
I updated the sample code:
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

To cope with "empty/ none" partitions I added

var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
      if (lastNotNullRow == None) {
        lastNotNullRow = toCarryBd.value.get(i + 1).get
      }


But that will result in

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-04|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

You see that noValidFormat should have been filled with 2016-01-02 to be
filled with last good known value (forward fill)
Cheers,
Georg

Liang-Chi Hsieh <vii...@gmail.com> schrieb am Mo., 9. Jan. 2017 um
09:08 Uhr:

>
> The map "toCarry" will return you (partitionIndex, None) for empty
> partition.
>
> So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is
> None.
> You of course should check if an Option has value or not before you access
> it.
>
> As the "toCarry" returned is the following when I tested your codes:
>
> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> Some(FooBar(Some(2016-01-02),second)))
>
> As you seen, there is no None, so the codes work without failure. But of
> course it depends how your data partitions.
>
> For empty partition, when you do mapPartitions, it just gives you an empty
> iterator as input. You can do what you need. You already return a None when
> you find an empty iterator in preparing "toCarry". So I was wondering what
> you want to ask in the previous reply.
>
>
>
> geoHeil wrote
> > Thanks a lot, Holden.
> >
> > @Liang-Chi Hsieh did you try to run
> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for me
> > that is crashing in either line 51 or 58. Holden described the problem
> > pretty well. Ist it clear for you now?
> >
> > Cheers,
> > Georg
> >
> > Holden Karau [via Apache Spark Developers List] <
>
> > ml-node+s1001551n20516h45@.nabble
>
> >> schrieb am Mo., 9. Jan. 2017 um
> > 06:40 Uhr:
> >
> >> Hi Georg,
> >>
> >> Thanks for the question along with the code (as well as posting to stack
> >> overflow). In general if a question is well suited for stackoverflow its
> >> probably better suited to the user@ list instead of the dev@ list so
> I've
> >> cc'd the user@ list for you.
> >>
> >> As far as handling empty partitions when working mapPartitions (and
> >> similar), the general approach is to return an empty iterator of the
> >> correct type when you have an empty input iterator.
> >>
> >> It looks like your code is doing this, however it seems like you likely
> >> have a bug in your application logic (namely it assumes that if a
> >> partition
> >> has a record missing a value it will either have had a previous row in
> >> the
> >> same partition which is good OR that the previous partition is not empty
> >> and has a good row - which need not necessarily be the case). You've
> >> partially fixed this problem by going through and for each partition
> >> collecting the last previous good value, and then if you don't have a
> >> good
> >> value at the start of a partition look up the value in the collected
> >> array.
> >>
> >> However, if this also happens at the same time the previous partition is
> >> empty, you will need to go and lookup the previous previous partition
> >> value
> >> until you find the one you are looking for. (Note this assumes that the
> >> first record in your dataset is valid, if it isn't your code will still
> >> fail).
> >>
> >> Your solution is really close to working but just has some minor
> >> assumptions which don't always necessarily hold.
> >>
> >> Cheers,
> >>
> >> Holden :)
> >> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email]
> >> &lt;http:///user/SendEmail.jtp?type=node&amp;node=20516&amp;i=0&gt;>
> >> wrote:
> >>
> >>
> >> Hi Georg,
> >>
> >> Can you describe your question more clear?
> >>
> >> Actually, the example codes you posted in stackoverflow doesn't crash as
> >> you
> >> said in the post.
> >>
> >>
> >> geoHeil wrote
> >> > I am working on building a custom ML pipeline-model / estimator to
> >> impute
> >> > missing values, e.g. I want to fill with last good known value.
> >> > Using a window function is slow / will put the data into a single
> >> > partition.
> >> > I built some sample code to use the RDD API however, it some None /
> >> null
> >> > problems with empty partitions.
> >> >
> >> > How should this be implemented properly to handle such empty
> >> partitions?
> >> >
> >>
> http://stackoverflow.com/questions/41474175/spark-mappartitionswithindex-handling-empty-partitions
> >> >
> >> > Kind regards,
> >> > Georg
> >>
> >>
> >>
> >>
> >>
> >> -----
> >>
> >>
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20515.html
> >>
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >>
> >> To unsubscribe e-mail: [hidden email]
> >> &lt;http:///user/SendEmail.jtp?type=node&amp;node=20516&amp;i=1&gt;
> >>
> >>
> >>
> >>
> >> --
> >> Cell : 425-233-8271 <(425)%20233-8271> <(425)%20233-8271>
> >> Twitter: https://twitter.com/holdenkarau
> >> If you reply to this email, your message will be added to the discussion
> >> below:
> >>
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20516.html
> >> To unsubscribe from handling of empty partitions, click here
> >> &lt;
> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=20496&amp;code=Z2Vvcmcua2YuaGVpbGVyQGdtYWlsLmNvbXwyMDQ5NnwtMTgzMzc4NTU4MQ==&gt
> ;
> >> .
> >> NAML
> >> &lt;
> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt
> ;
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20519.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to