robertwb commented on a change in pull request #12516:
URL: https://github.com/apache/beam/pull/12516#discussion_r473172807



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):

Review comment:
       All of these public methods are proxies for the (extensively documented) 
methods of the same name in Pandas. 

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -121,6 +130,12 @@ def test_series_tests(self):
             'pandas.core.series.Series.cumsum': ['*'],
             'pandas.core.series.Series.cumprod': ['*'],
             'pandas.core.series.Series.diff': ['*'],
+            'pandas.core.series.Series.nlargest': [
+                "s.nlargest(3, keep='last')"

Review comment:
       Same.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest-per-partition',
+        lambda df: df.nlargest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(

Review comment:
       Technically, it's the Expressions that they wrap that are passed down to 
workers for execution, but yes, that's the basic idea. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -200,9 +236,178 @@ def dropna(self, axis, **kwargs):
   items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
       'non-lazy')
 
+  def _cols_as_temporary_index(self, cols, suffix=''):
+    original_index_names = list(self._expr.proxy().index.names)
+    new_index_names = [
+        '__apache_beam_temp_%d_%s' % (ix, suffix)
+        for (ix, _) in enumerate(original_index_names)]
+    def revert(df):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'join_restoreindex',
+              lambda df:
+                  df.reset_index().set_index(new_index_names)
+                  .rename_axis(index=original_index_names, copy=False),
+              [df._expr],
+              preserves_partition_by=partitionings.Nothing(),
+              requires_partition_by=partitionings.Nothing()))
+    reindexed = frame_base.DeferredFrame.wrap(

Review comment:
       Good question. Only because it was always invoked right away. Updated to 
be a method for symmetry. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest-per-partition',
+        lambda df: df.nlargest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'nlargest',
+              lambda df: df.nlargest(**kwargs), [per_partition],
+              preserves_partition_by=partitionings.Singleton(),
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  def nsmallest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nsmallest-per-partition',
+        lambda df: df.nsmallest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'nsmallest',
+              lambda df: df.nsmallest(**kwargs), [per_partition],
+              preserves_partition_by=partitionings.Singleton(),
+              requires_partition_by=partitionings.Singleton()))
+
+  rename_axis = frame_base._elementwise_method('rename_axis')

Review comment:
       Given I'm going for full coverage, I tried to keep them in basic 
alphabetical order, unless there is an obvious relationship between methods. 
   
   As for elementwise_method, basically it creates a method that returns 
`expressions.ComputedExpression(name, lambda df: getattr(df, name)(args), 
self._expr, preserves=Singleton(), requires=Nothing)` but also handles a mix of 
deferred and non-deferred arguments. It was such a common pattern that I made a 
helper for it. 

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -43,6 +43,13 @@ def test_dataframe_tests(self):
             'pandas.core.frame.DataFrame.itertuples': ['*'],
             'pandas.core.frame.DataFrame.iterrows': ['*'],
             'pandas.core.frame.DataFrame.iteritems': ['*'],
+            'pandas.core.frame.DataFrame.nlargest': [
+                "df.nlargest(3, 'population', keep='last')"

Review comment:
       This is skipping a specific test that exists in the docstring. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')

Review comment:
       When there is a tie, a specification like "first" or "last" makes the 
result depend on the ordering to decide which ones to keep. With "all" we may 
return more than n occurrences (as in Pandas) but we no longer have to worry 
about keeping (or discarding) the right ones. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to