I see that there is the possibility to improve and make the algorithm more
fault tolerant as outlined by both of you.
Could you explain a little bit more why

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-04|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

i.e. that the records are not filled with the last good known value but
rather "the next one" is so clear.
Why does it depend on the partitions?
As the broadcast map is available to all the partitions shouldn't this be
the same regardless of partitioning?

The (too simple fix to be applicable generally)
 if (lastNotNullRow == None) {
        lastNotNullRow = toCarryBd.value.get(i + 1).get
      }
as of choosing the next element is only applied when the partition does not
contain new values.

Kind regards,
Georg

Liang-Chi Hsieh <vii...@gmail.com> schrieb am Do., 12. Jan. 2017 um
03:48 Uhr:

>
> Hi Georg,
>
> It is not strange. As I said before, it depends how the data is
> partitioned.
>
> When you try to get the available value from next partition like this:
>
> var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
>       if (lastNotNullRow == None) {
>         lastNotNullRow = toCarryBd.value.get(i + 1).get
>       }
>
> You may need to make sure the next partition has a value too. Holden has
> pointed out before, you need to deal with the case that the previous/next
> partition is empty too and go next until you find a non-empty partition.
>
>
>
> geoHeil wrote
> > Hi Liang-Chi Hsieh,
> >
> > Strange:
> > As the "toCarry" returned is the following when I tested your codes:
> >
> > Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> > Some(FooBar(Some(2016-01-02),second)))
> > For me it always looked like:
> >
> > ###################### carry
> > Map(2 -> None, 5 -> None, 4 -> None, 7 ->
> > Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 ->
> > Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)),
> > 6 -> None, 0 -> None)
> > (2,None)
> > (5,None)
> > (4,None)
> > (7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
> > (1,Some(FooBar(2016-01-01,first)))
> > (3,Some(FooBar(2016-01-02,second)))
> > (6,None)
> > (0,None)
> > ()
> > ###################### carry
> >
> >
> > I updated the code to contain a fixed default parallelism
> > .set("spark.default.parallelism", "12")
> >
> > Also:
> > I updated the sample code:
> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2
> >
> > To cope with "empty/ none" partitions I added
> >
> > var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
> >       if (lastNotNullRow == None) {
> >         lastNotNullRow = toCarryBd.value.get(i + 1).get
> >       }
> >
> >
> > But that will result in
> >
> > +----------+--------------------+
> > |       foo|                 bar|
> > +----------+--------------------+
> > |2016-01-01|               first|
> > |2016-01-02|              second|
> > |      null|       noValidFormat|
> > |2016-01-04|lastAssumingSameDate|
> > +----------+--------------------+
> >
> > +----------+--------------------+
> > |       foo|                 bar|
> > +----------+--------------------+
> > |2016-01-01|               first|
> > |2016-01-02|              second|
> > |2016-01-04|       noValidFormat|
> > |2016-01-04|lastAssumingSameDate|
> > +----------+--------------------+
> >
> > You see that noValidFormat should have been filled with 2016-01-02 to be
> > filled with last good known value (forward fill)
> > Cheers,
> > Georg
> >
> > Liang-Chi Hsieh &lt;
>
> > viirya@
>
> > &gt; schrieb am Mo., 9. Jan. 2017 um
> > 09:08 Uhr:
> >
> >>
> >> The map "toCarry" will return you (partitionIndex, None) for empty
> >> partition.
> >>
> >> So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is
> >> None.
> >> You of course should check if an Option has value or not before you
> >> access
> >> it.
> >>
> >> As the "toCarry" returned is the following when I tested your codes:
> >>
> >> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> >> Some(FooBar(Some(2016-01-02),second)))
> >>
> >> As you seen, there is no None, so the codes work without failure. But of
> >> course it depends how your data partitions.
> >>
> >> For empty partition, when you do mapPartitions, it just gives you an
> >> empty
> >> iterator as input. You can do what you need. You already return a None
> >> when
> >> you find an empty iterator in preparing "toCarry". So I was wondering
> >> what
> >> you want to ask in the previous reply.
> >>
> >>
> >>
> >> geoHeil wrote
> >> > Thanks a lot, Holden.
> >> >
> >> > @Liang-Chi Hsieh did you try to run
> >> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for
> me
> >> > that is crashing in either line 51 or 58. Holden described the problem
> >> > pretty well. Ist it clear for you now?
> >> >
> >> > Cheers,
> >> > Georg
> >> >
> >> > Holden Karau [via Apache Spark Developers List] <
> >>
> >> > ml-node+s1001551n20516h45@.nabble
> >>
> >> >> schrieb am Mo., 9. Jan. 2017 um
> >> > 06:40 Uhr:
> >> >
> >> >> Hi Georg,
> >> >>
> >> >> Thanks for the question along with the code (as well as posting to
> >> stack
> >> >> overflow). In general if a question is well suited for stackoverflow
> >> its
> >> >> probably better suited to the user@ list instead of the dev@ list so
> >> I've
> >> >> cc'd the user@ list for you.
> >> >>
> >> >> As far as handling empty partitions when working mapPartitions (and
> >> >> similar), the general approach is to return an empty iterator of the
> >> >> correct type when you have an empty input iterator.
> >> >>
> >> >> It looks like your code is doing this, however it seems like you
> >> likely
> >> >> have a bug in your application logic (namely it assumes that if a
> >> >> partition
> >> >> has a record missing a value it will either have had a previous row
> in
> >> >> the
> >> >> same partition which is good OR that the previous partition is not
> >> empty
> >> >> and has a good row - which need not necessarily be the case). You've
> >> >> partially fixed this problem by going through and for each partition
> >> >> collecting the last previous good value, and then if you don't have a
> >> >> good
> >> >> value at the start of a partition look up the value in the collected
> >> >> array.
> >> >>
> >> >> However, if this also happens at the same time the previous partition
> >> is
> >> >> empty, you will need to go and lookup the previous previous partition
> >> >> value
> >> >> until you find the one you are looking for. (Note this assumes that
> >> the
> >> >> first record in your dataset is valid, if it isn't your code will
> >> still
> >> >> fail).
> >> >>
> >> >> Your solution is really close to working but just has some minor
> >> >> assumptions which don't always necessarily hold.
> >> >>
> >> >> Cheers,
> >> >>
> >> >> Holden :)
> >> >> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email]
> >> >> &lt;http:///user/SendEmail.jtp?type=node&amp;node=20516&amp;i=0&gt;>
> >> >> wrote:
> >> >>
> >> >>
> >> >> Hi Georg,
> >> >>
> >> >> Can you describe your question more clear?
> >> >>
> >> >> Actually, the example codes you posted in stackoverflow doesn't crash
> >> as
> >> >> you
> >> >> said in the post.
> >> >>
> >> >>
> >> >> geoHeil wrote
> >> >> > I am working on building a custom ML pipeline-model / estimator to
> >> >> impute
> >> >> > missing values, e.g. I want to fill with last good known value.
> >> >> > Using a window function is slow / will put the data into a single
> >> >> > partition.
> >> >> > I built some sample code to use the RDD API however, it some None /
> >> >> null
> >> >> > problems with empty partitions.
> >> >> >
> >> >> > How should this be implemented properly to handle such empty
> >> >> partitions?
> >> >> >
> >> >>
> >>
> http://stackoverflow.com/questions/41474175/spark-mappartitionswithindex-handling-empty-partitions
> >> >> >
> >> >> > Kind regards,
> >> >> > Georg
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> -----
> >> >>
> >> >>
> >> >> Liang-Chi Hsieh | @viirya
> >> >> Spark Technology Center
> >> >> http://www.spark.tc/
> >> >>
> >> >> --
> >> >> View this message in context:
> >> >>
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20515.html
> >> >>
> >> >> Sent from the Apache Spark Developers List mailing list archive at
> >> >> Nabble.com.
> >> >>
> >> >> ---------------------------------------------------------------------
> >> >>
> >> >> To unsubscribe e-mail: [hidden email]
> >> >> &lt;http:///user/SendEmail.jtp?type=node&amp;node=20516&amp;i=1&gt;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Cell : 425-233-8271 <(425)%20233-8271> <(425)%20233-8271>
> <(425)%20233-8271>
> >> >> Twitter: https://twitter.com/holdenkarau
> >> >> If you reply to this email, your message will be added to the
> >> discussion
> >> >> below:
> >> >>
> >> >>
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20516.html
> >> >> To unsubscribe from handling of empty partitions, click here
> >> >> &lt;
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=20496&amp;code=Z2Vvcmcua2YuaGVpbGVyQGdtYWlsLmNvbXwyMDQ5NnwtMTgzMzc4NTU4MQ==&gt
> >> ;
> >> >> .
> >> >> NAML
> >> >> &lt;
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt
> >> ;
> >> >>
> >>
> >>
> >>
> >>
> >>
> >> -----
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >> --
> >> View this message in context:
> >>
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20519.html
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20558.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to