tysonjh commented on a change in pull request #12516: URL: https://github.com/apache/beam/pull/12516#discussion_r470303371
########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') + per_partition = expressions.ComputedExpression( + 'nlargest-per-partition', + lambda df: df.nlargest(**kwargs), [self._expr], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Nothing()) + with expressions.allow_non_parallel_operations(True): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'nlargest', + lambda df: df.nlargest(**kwargs), [per_partition], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) + + @frame_base.args_to_kwargs(pd.Series) + def nsmallest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') + per_partition = expressions.ComputedExpression( + 'nsmallest-per-partition', + lambda df: df.nsmallest(**kwargs), [self._expr], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Nothing()) + with expressions.allow_non_parallel_operations(True): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'nsmallest', + lambda df: df.nsmallest(**kwargs), [per_partition], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) + + rename_axis = frame_base._elementwise_method('rename_axis') Review comment: Should this be up with the other elementwise_methods (e.g. `prod`) in the class? Or closer to the call site? I'm just trying to understand why it is here. Also, I tried to follow along the implementation of `elementwise_method` but got lost at the `_proxy_function`. What does this do? ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') Review comment: How does 'keep' relate to the order sensitivity mentioned in the error? Using 'keep' can result in a possibly larger than n Series returned. Shouldn't this be using 'first' to return the first n occurrences? ########## File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py ########## @@ -121,6 +130,12 @@ def test_series_tests(self): 'pandas.core.series.Series.cumsum': ['*'], 'pandas.core.series.Series.cumprod': ['*'], 'pandas.core.series.Series.diff': ['*'], + 'pandas.core.series.Series.nlargest': [ + "s.nlargest(3, keep='last')" Review comment: 'first' here too? ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') + per_partition = expressions.ComputedExpression( + 'nlargest-per-partition', + lambda df: df.nlargest(**kwargs), [self._expr], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Nothing()) + with expressions.allow_non_parallel_operations(True): + return frame_base.DeferredFrame.wrap( Review comment: Are `DeferredFrame` expressions that are configured at pipeline construction but passed to workers for execution? ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -200,9 +236,178 @@ def dropna(self, axis, **kwargs): items = itertuples = iterrows = iteritems = frame_base.wont_implement_method( 'non-lazy') + def _cols_as_temporary_index(self, cols, suffix=''): + original_index_names = list(self._expr.proxy().index.names) + new_index_names = [ + '__apache_beam_temp_%d_%s' % (ix, suffix) + for (ix, _) in enumerate(original_index_names)] + def revert(df): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_restoreindex', + lambda df: + df.reset_index().set_index(new_index_names) + .rename_axis(index=original_index_names, copy=False), + [df._expr], + preserves_partition_by=partitionings.Nothing(), + requires_partition_by=partitionings.Nothing())) + reindexed = frame_base.DeferredFrame.wrap( Review comment: Why is this not defined as a method like `revert` is above? ########## File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py ########## @@ -43,6 +43,13 @@ def test_dataframe_tests(self): 'pandas.core.frame.DataFrame.itertuples': ['*'], 'pandas.core.frame.DataFrame.iterrows': ['*'], 'pandas.core.frame.DataFrame.iteritems': ['*'], + 'pandas.core.frame.DataFrame.nlargest': [ + "df.nlargest(3, 'population', keep='last')" Review comment: keep='first' as well? ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): Review comment: Documentation? The method name itself is pretty self documenting but at least on the parameters? I had to go look them up from pd.Series. Similarly for other public methods in this class, documentation would be helpful and go a long way towards making this more accessible for contributors. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org