robertwb commented on a change in pull request #12516: URL: https://github.com/apache/beam/pull/12516#discussion_r473172807
########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): Review comment: All of these public methods are proxies for the (extensively documented) methods of the same name in Pandas. ########## File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py ########## @@ -121,6 +130,12 @@ def test_series_tests(self): 'pandas.core.series.Series.cumsum': ['*'], 'pandas.core.series.Series.cumprod': ['*'], 'pandas.core.series.Series.diff': ['*'], + 'pandas.core.series.Series.nlargest': [ + "s.nlargest(3, keep='last')" Review comment: Same. ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') + per_partition = expressions.ComputedExpression( + 'nlargest-per-partition', + lambda df: df.nlargest(**kwargs), [self._expr], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Nothing()) + with expressions.allow_non_parallel_operations(True): + return frame_base.DeferredFrame.wrap( Review comment: Technically, it's the Expressions that they wrap that are passed down to workers for execution, but yes, that's the basic idea. ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -200,9 +236,178 @@ def dropna(self, axis, **kwargs): items = itertuples = iterrows = iteritems = frame_base.wont_implement_method( 'non-lazy') + def _cols_as_temporary_index(self, cols, suffix=''): + original_index_names = list(self._expr.proxy().index.names) + new_index_names = [ + '__apache_beam_temp_%d_%s' % (ix, suffix) + for (ix, _) in enumerate(original_index_names)] + def revert(df): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'join_restoreindex', + lambda df: + df.reset_index().set_index(new_index_names) + .rename_axis(index=original_index_names, copy=False), + [df._expr], + preserves_partition_by=partitionings.Nothing(), + requires_partition_by=partitionings.Nothing())) + reindexed = frame_base.DeferredFrame.wrap( Review comment: Good question. Only because it was always invoked right away. Updated to be a method for symmetry. ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') + per_partition = expressions.ComputedExpression( + 'nlargest-per-partition', + lambda df: df.nlargest(**kwargs), [self._expr], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Nothing()) + with expressions.allow_non_parallel_operations(True): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'nlargest', + lambda df: df.nlargest(**kwargs), [per_partition], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) + + @frame_base.args_to_kwargs(pd.Series) + def nsmallest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') + per_partition = expressions.ComputedExpression( + 'nsmallest-per-partition', + lambda df: df.nsmallest(**kwargs), [self._expr], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Nothing()) + with expressions.allow_non_parallel_operations(True): + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'nsmallest', + lambda df: df.nsmallest(**kwargs), [per_partition], + preserves_partition_by=partitionings.Singleton(), + requires_partition_by=partitionings.Singleton())) + + rename_axis = frame_base._elementwise_method('rename_axis') Review comment: Given I'm going for full coverage, I tried to keep them in basic alphabetical order, unless there is an obvious relationship between methods. As for elementwise_method, basically it creates a method that returns `expressions.ComputedExpression(name, lambda df: getattr(df, name)(args), self._expr, preserves=Singleton(), requires=Nothing)` but also handles a mix of deferred and non-deferred arguments. It was such a common pattern that I made a helper for it. ########## File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py ########## @@ -43,6 +43,13 @@ def test_dataframe_tests(self): 'pandas.core.frame.DataFrame.itertuples': ['*'], 'pandas.core.frame.DataFrame.iterrows': ['*'], 'pandas.core.frame.DataFrame.iteritems': ['*'], + 'pandas.core.frame.DataFrame.nlargest': [ + "df.nlargest(3, 'population', keep='last')" Review comment: This is skipping a specific test that exists in the docstring. ########## File path: sdks/python/apache_beam/dataframe/frames.py ########## @@ -54,6 +54,42 @@ def agg(self, *args, **kwargs): 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') + @frame_base.args_to_kwargs(pd.Series) + def nlargest(self, **kwargs): + if 'keep' in kwargs and kwargs['keep'] != 'all': + raise frame_base.WontImplementError('order-sensitive') Review comment: When there is a tie, a specification like "first" or "last" makes the result depend on the ordering to decide which ones to keep. With "all" we may return more than n occurrences (as in Pandas) but we no longer have to worry about keeping (or discarding) the right ones. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org