I'm afraid I'm also against the proposal so far.
What's wrong with going with "1. Functions" and using transform which allows
chaining functions?
I was not sure what you mean by "manage the namespaces", though.
def with_price(df, factor: float = 2.0):
return df.withColumn("price", F.col("price") * factor)
df.transform(with_price).show()
I have to admit that the current transform is a bit annoying when the
function takes parameters:
df.transform(lambda input_df: with_price(input_df, 100)).show()
but we can improve transform to take the parameters for the function.
Or, I'd also recommend using a wrapper as Maciej suggested, but without
delegating all functions.
I'd expose only functions which are really necessary; otherwise management
of the dataframe would be rather more difficult.
For example, with a MyBusinessDataFrame
<https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e#file-dynamic_dataframe_minimal-py-L28-L30>
,
base_dataframe = spark.createDataFrame(
data=[['product_1', 2], ['product_2', 4]],
schema=["name", "price"],
)
dyn_business = MyBusinessDataFrame(base_dataframe)
dyn_business.select("name").my_business_query(2.0)
will raise an AnalysisException because there is not the price column
anymore.
We should manage the dataframe in the wrapper properly.
Thanks.
On Wed, Dec 29, 2021 at 8:49 AM Maciej <[email protected]> wrote:
> On 12/29/21 16:18, Pablo Alcain wrote:
> > Hey Maciej! Thanks for your answer and the comments :)
> >
> > On Wed, Dec 29, 2021 at 3:06 PM Maciej <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > This seems like a lot of trouble for not so common use case that has
> > viable alternatives. Once you assume that class is intended for
> > inheritance (which, arguably we neither do or imply a the moment)
> you're
> > even more restricted that we are right now, according to the project
> > policy and need for keeping things synchronized across all languages.
> >
> > By "this" you mean the modification of the DataFrame, the implementation
> > of a new pyspark class (DynamicDataFrame in this case) or the approach
> > in general?
>
> I mean promoting DataFrame as extensible in general. It is a risk of
> getting stuck with specific API, even more than we are right now, with
> little reward at the end.
>
> Additionally:
>
> - As far as I am aware nothing suggests that it is widely requested
> feature (corresponding SO questions didn't get much traffic over the
> years and I don't think we have any preceding JIRA tickets).
> - It can be addressed outside the project (within user codebase or as a
> standalone package) with minimal or no overhead.
>
> That being said ‒ if we're going to rewrite Python DataFrame methods to
> return instance type, I strongly believe that the existing methods
> should be marked as final.
>
> >
> >
> >
> > On Scala side, I would rather expect to see type classes than direct
> > inheritance so this might be a dead feature from the start.
> >
> > As of Python (sorry if I missed something in the preceding
> discussion),
> > quite natural approach would be to wrap DataFrame instance in your
> > business class and delegate calls to the wrapped object. A very naive
> > implementation could look like this
> >
> > from functools import wraps
> >
> > class BusinessModel:
> > @classmethod
> > def delegate(cls, a):
> > def _(*args, **kwargs):
> > result = a(*args, **kwargs)
> > if isinstance(result, DataFrame):
> > return cls(result)
> > else:
> > return result
> >
> > if callable(a):
> > return wraps(a)(_)
> > else:
> > return a
> >
> > def __init__(self, df):
> > self._df = df
> >
> > def __getattr__(self, name):
> > return BusinessModel.delegate(getattr(self._df, name))
> >
> > def with_price(self, price=42):
> > return self.selectExpr("*", f"{price} as price")
> >
> >
> >
> > Yes, effectively the solution is very similar to this one. I believe
> > that the advantage of doing it without hijacking with the decorator the
> > delegation is that you can still maintain static typing.
>
> You can maintain type checker compatibility (it is easier with stubs,
> but you can do it with inline hints as well, if I recall correctly) here
> as well.
>
> > On the other
> > hand (and this is probably a minor issue), when following this approach
> > with the `isinstance` checking for the casting you might end up casting
> > the `.summary()` and `.describe()` methods that probably you want still
> > to keep as "pure" DataFrames. If you see it from this perspective, then
> > "DynamicDataFrame" would be the boilerplate code that allows you to
> > decide more granularly what methods you want to delegate.
>
> You can do it with `__getattr__` as well. There are probably some edge
> cases (especially when accessing columns with `.`), but it should be
> still manageable.
>
>
> Just to be clear ‒ I am not insisting that this is somehow superior
> solution (there are things that cannot be done through delegation).
>
> >
> > (BusinessModel(spark.createDataFrame([(1, "DEC")], ("id", "month")))
> > .select("id")
> > .with_price(0.0)
> > .select("price")
> > .show())
> >
> >
> > but it can be easily adjusted to handle more complex uses cases,
> > including inheritance.
> >
> >
> >
> > On 12/29/21 12:54, Pablo Alcain wrote:
> > > Hey everyone! I'm re-sending this e-mail, now with a PR proposal
> > > (https://github.com/apache/spark/pull/35045
> > <https://github.com/apache/spark/pull/35045>
> > > <https://github.com/apache/spark/pull/35045
> > <https://github.com/apache/spark/pull/35045>> if you want to take a
> look
> > > at the code with a couple of examples). The proposed change
> includes
> > > only a new class that would extend only the Python API without
> > doing any
> > > change to the underlying scala code. The benefit would be that the
> new
> > > code only extends previous functionality without breaking any
> existing
> > > application code, allowing pyspark users to try it out and see if
> it
> > > turns out to be useful. Hyukjin Kwon
> > > <https://github.com/HyukjinKwon
> > <https://github.com/HyukjinKwon>> commented that a drawback with
> this
> > > would be that, if we do this, it would be hard to deprecate later
> the
> > > `DynamicDataFrame` API. The other option, if we want this
> > inheritance to
> > > be feasible, is to directly implement this "casting" directly on
> the
> > > `DataFrame` code, so for example it would change from
> > >
> > > def limit(self, num: int) -> "DataFrame":
> > > jdf = self._jdf.limit(num)
> > > return DataFrame(jdf, self.sql_ctx)
> > >
> > > to
> > >
> > > def limit(self, num: int) -> "DataFrame":
> > > jdf = self._jdf.li <http://jdf.li> <http://jdf.li
> > <http://jdf.li>> mit(num)
> > > return self.__class__(jdf, self.sql_ctx) # type(self) would
> > work as well
> > >
> > > This approach would probably need to implement similar changes on
> the
> > > Scala API as well in order to allow this kind of inheritance on
> > Scala as
> > > well (unfortunately I'm not knowledgable enough in Scala to figure
> out
> > > what the changes would be exactly)
> > >
> > > I wanted to gather your input on this idea, whether you think it
> > can be
> > > helpful or not, and what would be the best strategy, in your
> > opinion, to
> > > pursue it.
> > >
> > > Thank you very much!
> > > Pablo
> > >
> > > On Thu, Nov 4, 2021 at 9:44 PM Pablo Alcain
> > > <[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>> wrote:
> > >
> > > tl;dr: a proposal for a pyspark "DynamicDataFrame" class that
> > would
> > > make it easier to inherit from it while keeping dataframe
> methods.
> > >
> > > Hello everyone. We have been working for a long time with
> PySpark
> > > and more specifically with DataFrames. In our pipelines we have
> > > several tables, with specific purposes, that we usually load as
> > > DataFrames. As you might expect, there are a handful of
> > queries and
> > > transformations per dataframe that are done many times, so we
> > > thought of ways that we could abstract them:
> > >
> > > 1. Functions: using functions that call dataframes and returns
> > them
> > > transformed. It had a couple of pitfalls: we had to manage the
> > > namespaces carefully, and also the "chainability" didn't feel
> very
> > > pyspark-y.
> > > 2. MonkeyPatching DataFrame: we monkeypatched
> > >
> > (
> https://stackoverflow.com/questions/5626193/what-is-monkey-patching <
> https://stackoverflow.com/questions/5626193/what-is-monkey-patching>
> > >
> > <
> https://stackoverflow.com/questions/5626193/what-is-monkey-patching <
> https://stackoverflow.com/questions/5626193/what-is-monkey-patching>>)
> > > methods with the regularly done queries inside the DataFrame
> > class.
> > > This one kept it pyspark-y, but there was no easy way to handle
> > > segregated namespaces/
> > > 3. Inheritances: create the class `MyBusinessDataFrame`,
> inherit
> > > from `DataFrame` and implement the methods there. This one
> solves
> > > all the issues, but with a caveat: the chainable methods cast
> the
> > > result explicitly to `DataFrame` (see
> > >
> >
> https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1910
> > <
> https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1910
> >
> > >
> > <
> https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1910
> > <
> https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1910
> >>
> > > e g). Therefore, everytime you use one of the parent's methods
> > you'd
> > > have to re-cast to `MyBusinessDataFrame`, making the code
> > cumbersome.
> > >
> > > In view of these pitfalls we decided to go for a slightly
> > different
> > > approach, inspired by #3: We created a class called
> > > `DynamicDataFrame` that overrides the explicit call to
> `DataFrame`
> > > as done in PySpark but instead casted dynamically to
> > > `self.__class__` (see
> > >
> >
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e#file-dynamic_dataframe_minimal-py-L21
> > <
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e#file-dynamic_dataframe_minimal-py-L21
> >
> > >
> > <
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e#file-dynamic_dataframe_minimal-py-L21
> > <
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e#file-dynamic_dataframe_minimal-py-L21
> >>
> > > e g). This allows the fluent methods to always keep the same
> > class,
> > > making chainability as smooth as it is with pyspark dataframes.
> > >
> > > As an example implementation, here's a link to a gist
> > >
> > (
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e
> > <
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e>
> > <
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e <
> https://gist.github.com/pabloalcain/de79938507ad2d823a866238b3c8a66e>>)
> > > that implemented dynamically `withColumn` and `select` methods
> and
> > > the expected output.
> > >
> > > I'm sharing this here in case you feel like this approach can
> be
> > > useful for anyone else. In our case it greatly sped up the
> > > development of abstraction layers and allowed us to write
> cleaner
> > > code. One of the advantages is that it would simply be a
> "plugin"
> > > over pyspark, that does not modify anyhow already existing
> code or
> > > application interfaces.
> > >
> > > If you think that this can be helpful, I can write a PR as a
> more
> > > refined proof of concept.
> > >
> > > Thanks!
> > >
> > > Pablo
> > >
> >
> >
> > --
> > Best regards,
> > Maciej Szymkiewicz
> >
> > Web: https://zero323.net <https://zero323.net>
> > PGP: A30CEF0C31A501EC
> >
>
>
> --
> Best regards,
> Maciej Szymkiewicz
>
> Web: https://zero323.net
> PGP: A30CEF0C31A501EC
>