kennknowles commented on a change in pull request #14992:
URL: https://github.com/apache/beam/pull/14992#discussion_r651203658



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3210,17 +3230,15 @@ class _DeferredGroupByCols(frame_base.DeferredFrame):
   diff = frame_base._elementwise_method('diff', base=DataFrameGroupBy)
   fillna = frame_base._elementwise_method('fillna', base=DataFrameGroupBy)
   filter = frame_base._elementwise_method('filter', base=DataFrameGroupBy)
-  first = frame_base.wont_implement_method(
-      DataFrameGroupBy, 'first', reason="order-sensitive")
+  first = frame_base._elementwise_method('first', base=DataFrameGroupBy)

Review comment:
       Putting it here cause it to do a `first` globally?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -253,6 +253,36 @@ def fillna(self, value, method, axis, limit, **kwargs):
   backfill = _fillna_alias('backfill')
   pad = _fillna_alias('pad')
 
+  @frame_base.with_docs_from(pd.DataFrame)
+  def first(self, offset):
+    per_partition = expressions.ComputedExpression(
+        'first-per-partition',
+        lambda df: df.sort_index().first(offset=offset), [self._expr],
+        preserves_partition_by=partitionings.Arbitrary(),
+        requires_partition_by=partitionings.Arbitrary())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'first',
+              lambda df: df.sort_index().first(offset=offset), [per_partition],
+              preserves_partition_by=partitionings.Arbitrary(),
+              requires_partition_by=partitionings.Singleton()))

Review comment:
       This is the bit that causes it to feed in the whole dataframe to the 
DoFn at once?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -253,6 +253,36 @@ def fillna(self, value, method, axis, limit, **kwargs):
   backfill = _fillna_alias('backfill')
   pad = _fillna_alias('pad')
 
+  @frame_base.with_docs_from(pd.DataFrame)
+  def first(self, offset):
+    per_partition = expressions.ComputedExpression(
+        'first-per-partition',
+        lambda df: df.sort_index().first(offset=offset), [self._expr],
+        preserves_partition_by=partitionings.Arbitrary(),

Review comment:
       Assuming this means that it preserves no partitioning?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3037,10 +3059,8 @@ def do_partition_apply(df):
   tail = frame_base.wont_implement_method(
       DataFrameGroupBy, 'tail', explanation=_PEEK_METHOD_EXPLANATION)
 
-  first = frame_base.wont_implement_method(
-      DataFrameGroupBy, 'first', reason='order-sensitive')
-  last = frame_base.wont_implement_method(
-      DataFrameGroupBy, 'last', reason='order-sensitive')
+  first = frame_base.not_implemented_method('first')

Review comment:
       What's the difference here? I was looking at adjacent lines to see if 
there were any analogous functions, to understand at a high level, but I don't 
see any.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to