Hi Liang-Chi Hsieh, Strange: As the "toCarry" returned is the following when I tested your codes:
Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 -> Some(FooBar(Some(2016-01-02),second))) For me it always looked like: ###################### carry Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None) (2,None) (5,None) (4,None) (7,Some(FooBar(2016-01-04,lastAssumingSameDate))) (1,Some(FooBar(2016-01-01,first))) (3,Some(FooBar(2016-01-02,second))) (6,None) (0,None) () ###################### carry I updated the code to contain a fixed default parallelism .set("spark.default.parallelism", "12") Also: I updated the sample code: https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 To cope with "empty/ none" partitions I added var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get if (lastNotNullRow == None) { lastNotNullRow = toCarryBd.value.get(i + 1).get } But that will result in +----------+--------------------+ | foo| bar| +----------+--------------------+ |2016-01-01| first| |2016-01-02| second| | null| noValidFormat| |2016-01-04|lastAssumingSameDate| +----------+--------------------+ +----------+--------------------+ | foo| bar| +----------+--------------------+ |2016-01-01| first| |2016-01-02| second| |2016-01-04| noValidFormat| |2016-01-04|lastAssumingSameDate| +----------+--------------------+ You see that noValidFormat should have been filled with 2016-01-02 to be filled with last good known value (forward fill) Cheers, Georg Liang-Chi Hsieh <vii...@gmail.com> schrieb am Mo., 9. Jan. 2017 um 09:08 Uhr: > > The map "toCarry" will return you (partitionIndex, None) for empty > partition. > > So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is > None. > You of course should check if an Option has value or not before you access > it. > > As the "toCarry" returned is the following when I tested your codes: > > Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 -> > Some(FooBar(Some(2016-01-02),second))) > > As you seen, there is no None, so the codes work without failure. But of > course it depends how your data partitions. > > For empty partition, when you do mapPartitions, it just gives you an empty > iterator as input. You can do what you need. You already return a None when > you find an empty iterator in preparing "toCarry". So I was wondering what > you want to ask in the previous reply. > > > > geoHeil wrote > > Thanks a lot, Holden. > > > > @Liang-Chi Hsieh did you try to run > > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for me > > that is crashing in either line 51 or 58. Holden described the problem > > pretty well. Ist it clear for you now? > > > > Cheers, > > Georg > > > > Holden Karau [via Apache Spark Developers List] < > > > ml-node+s1001551n20516h45@.nabble > > >> schrieb am Mo., 9. Jan. 2017 um > > 06:40 Uhr: > > > >> Hi Georg, > >> > >> Thanks for the question along with the code (as well as posting to stack > >> overflow). In general if a question is well suited for stackoverflow its > >> probably better suited to the user@ list instead of the dev@ list so > I've > >> cc'd the user@ list for you. > >> > >> As far as handling empty partitions when working mapPartitions (and > >> similar), the general approach is to return an empty iterator of the > >> correct type when you have an empty input iterator. > >> > >> It looks like your code is doing this, however it seems like you likely > >> have a bug in your application logic (namely it assumes that if a > >> partition > >> has a record missing a value it will either have had a previous row in > >> the > >> same partition which is good OR that the previous partition is not empty > >> and has a good row - which need not necessarily be the case). You've > >> partially fixed this problem by going through and for each partition > >> collecting the last previous good value, and then if you don't have a > >> good > >> value at the start of a partition look up the value in the collected > >> array. > >> > >> However, if this also happens at the same time the previous partition is > >> empty, you will need to go and lookup the previous previous partition > >> value > >> until you find the one you are looking for. (Note this assumes that the > >> first record in your dataset is valid, if it isn't your code will still > >> fail). > >> > >> Your solution is really close to working but just has some minor > >> assumptions which don't always necessarily hold. > >> > >> Cheers, > >> > >> Holden :) > >> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email] > >> <http:///user/SendEmail.jtp?type=node&node=20516&i=0>> > >> wrote: > >> > >> > >> Hi Georg, > >> > >> Can you describe your question more clear? > >> > >> Actually, the example codes you posted in stackoverflow doesn't crash as > >> you > >> said in the post. > >> > >> > >> geoHeil wrote > >> > I am working on building a custom ML pipeline-model / estimator to > >> impute > >> > missing values, e.g. I want to fill with last good known value. > >> > Using a window function is slow / will put the data into a single > >> > partition. > >> > I built some sample code to use the RDD API however, it some None / > >> null > >> > problems with empty partitions. > >> > > >> > How should this be implemented properly to handle such empty > >> partitions? > >> > > >> > http://stackoverflow.com/questions/41474175/spark-mappartitionswithindex-handling-empty-partitions > >> > > >> > Kind regards, > >> > Georg > >> > >> > >> > >> > >> > >> ----- > >> > >> > >> Liang-Chi Hsieh | @viirya > >> Spark Technology Center > >> http://www.spark.tc/ > >> > >> -- > >> View this message in context: > >> > http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20515.html > >> > >> Sent from the Apache Spark Developers List mailing list archive at > >> Nabble.com. > >> > >> --------------------------------------------------------------------- > >> > >> To unsubscribe e-mail: [hidden email] > >> <http:///user/SendEmail.jtp?type=node&node=20516&i=1> > >> > >> > >> > >> > >> -- > >> Cell : 425-233-8271 <(425)%20233-8271> <(425)%20233-8271> > >> Twitter: https://twitter.com/holdenkarau > >> If you reply to this email, your message will be added to the discussion > >> below: > >> > >> > http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20516.html > >> To unsubscribe from handling of empty partitions, click here > >> < > http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=20496&code=Z2Vvcmcua2YuaGVpbGVyQGdtYWlsLmNvbXwyMDQ5NnwtMTgzMzc4NTU4MQ==> > ; > >> . > >> NAML > >> < > http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > ; > >> > > > > > > ----- > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20519.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >