TheNeuralBit commented on a change in pull request #16706:
URL: https://github.com/apache/beam/pull/16706#discussion_r798106419



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs):
     fn_input = project(self._ungrouped_with_index.proxy().reset_index(
         grouping_columns, drop=True))
     result = func(fn_input)
-    if isinstance(result, pd.core.generic.NDFrame):
-      if result.index is fn_input.index:
-        proxy = result
+    def index_to_arrays(index):
+      return [index.get_level_values(level)
+              for level in range(index.nlevels)]
+
+
+    # By default do_apply will just call pandas apply()
+    # We override it below if necessary
+    do_apply = lambda gb: gb.apply(func, *args, **kwargs)
+
+    if (isinstance(result, pd.core.generic.NDFrame) and
+        result.index is fn_input.index):
+      # Special case where apply fn is a transform
+      # Note we trust that if the user fn produces a proxy with the identical
+      # index, it will produce results with identical indexes at execution
+      # time too
+      proxy = result
+    elif isinstance(result, pd.DataFrame):
+      # apply fn is not a transform, we need to make sure the original index
+      # values are prepended to the result's index
+      proxy = result[:0]
+
+      # First adjust proxy
+      proxy.index = pd.MultiIndex.from_arrays(
+          index_to_arrays(self._ungrouped.proxy().index) +
+          index_to_arrays(proxy.index),
+          names=self._ungrouped.proxy().index.names + proxy.index.names)
+
+
+      # Then override do_apply function
+      new_index_names = self._ungrouped.proxy().index.names
+      if len(new_index_names) > 1:
+        def add_key_index(key, df):
+          # df is a dataframe or Series representing the result of func for
+          # a single key
+          # key is a tuple with the MultiIndex values for this key
+          df.index = pd.MultiIndex.from_arrays(
+              [[key[i]] * len(df) for i in range(len(new_index_names))] +
+              index_to_arrays(df.index),
+              names=new_index_names + df.index.names)
+          return df
       else:
-        proxy = result[:0]
-
-        def index_to_arrays(index):
-          return [index.get_level_values(level)
-                  for level in range(index.nlevels)]
-
-        # The final result will have the grouped indexes + the indexes from the
-        # result
-        proxy.index = pd.MultiIndex.from_arrays(
-            index_to_arrays(self._ungrouped.proxy().index) +
-            index_to_arrays(proxy.index),
-            names=self._ungrouped.proxy().index.names + proxy.index.names)
+        def add_key_index(key, df):
+          # df is a dataframe or Series representing the result of func for
+          # a single key
+          df.index = pd.MultiIndex.from_arrays(
+              [[key] * len(df)] + index_to_arrays(df.index),
+              names=new_index_names + df.index.names)
+          return df
+
+
+      do_apply = lambda gb: pd.concat([
+          add_key_index(k, func(gb.get_group(k), *args, **kwargs))
+          for k in gb.groups.keys()])
+    elif isinstance(result, pd.Series):
+      if isinstance(fn_input, pd.DataFrame):
+        # DataFrameGroupBy
+        dtype = pd.Series([result]).dtype
+        proxy = pd.DataFrame(columns=result.index,
+                             dtype=result.dtype,
+                             index=self._ungrouped.proxy().index)

Review comment:
       Yeah it's a bit surprising. It turns out in this case pandas transposes 
the Series - it's index values become the columns.
   
   ```
   In [3]: df
   Out[3]: 
        brand style  rating
   0  Yum Yum   cup     4.0
   1  Yum Yum   cup     4.0
   2  Indomie   cup     3.5
   3  Indomie  pack    15.0
   4  Indomie  pack     5.0
   
   In [5]: df.groupby('style').apply(lambda df: df.rating.describe())
   Out[5]: 
   rating  count       mean       std  min   25%   50%   75%   max
   style                                                          
   cup       3.0   3.833333  0.288675  3.5  3.75   4.0   4.0   4.0
   pack      2.0  10.000000  7.071068  5.0  7.50  10.0  12.5  15.0
   ```
   
   Compare this to the case where `fn_input` is a `Series`. In this case the 
output is still a `Series`:
   
   ```
   In [6]: df.groupby('style').rating.apply(lambda s: s.describe())
   Out[6]: 
   style       
   cup    count     3.000000
          mean      3.833333
          std       0.288675
          min       3.500000
          25%       3.750000
          50%       4.000000
          75%       4.000000
          max       4.000000
   pack   count     2.000000
          mean     10.000000
          std       7.071068
          min       5.000000
          25%       7.500000
          50%      10.000000
          75%      12.500000
          max      15.000000
   Name: rating, dtype: float64
   ```
   
   I'll add a comment clarifying this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to