[ 
https://issues.apache.org/jira/browse/BEAM-13605?focusedWorklogId=719775&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-719775
 ]

ASF GitHub Bot logged work on BEAM-13605:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Feb/22 23:55
            Start Date: 02/Feb/22 23:55
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#16706:
URL: https://github.com/apache/beam/pull/16706#discussion_r798110325



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3986,29 +3996,78 @@ def apply(self, func, *args, **kwargs):
     fn_input = project(self._ungrouped_with_index.proxy().reset_index(
         grouping_columns, drop=True))
     result = func(fn_input)
-    if isinstance(result, pd.core.generic.NDFrame):
-      if result.index is fn_input.index:
-        proxy = result
+    def index_to_arrays(index):
+      return [index.get_level_values(level)
+              for level in range(index.nlevels)]
+
+
+    # By default do_apply will just call pandas apply()
+    # We override it below if necessary
+    do_apply = lambda gb: gb.apply(func, *args, **kwargs)
+
+    if (isinstance(result, pd.core.generic.NDFrame) and
+        result.index is fn_input.index):
+      # Special case where apply fn is a transform
+      # Note we trust that if the user fn produces a proxy with the identical
+      # index, it will produce results with identical indexes at execution
+      # time too
+      proxy = result
+    elif isinstance(result, pd.DataFrame):
+      # apply fn is not a transform, we need to make sure the original index
+      # values are prepended to the result's index
+      proxy = result[:0]
+
+      # First adjust proxy
+      proxy.index = pd.MultiIndex.from_arrays(
+          index_to_arrays(self._ungrouped.proxy().index) +
+          index_to_arrays(proxy.index),
+          names=self._ungrouped.proxy().index.names + proxy.index.names)
+
+
+      # Then override do_apply function
+      new_index_names = self._ungrouped.proxy().index.names
+      if len(new_index_names) > 1:
+        def add_key_index(key, df):
+          # df is a dataframe or Series representing the result of func for
+          # a single key
+          # key is a tuple with the MultiIndex values for this key
+          df.index = pd.MultiIndex.from_arrays(
+              [[key[i]] * len(df) for i in range(len(new_index_names))] +
+              index_to_arrays(df.index),
+              names=new_index_names + df.index.names)
+          return df
       else:
-        proxy = result[:0]
-
-        def index_to_arrays(index):
-          return [index.get_level_values(level)
-                  for level in range(index.nlevels)]
-
-        # The final result will have the grouped indexes + the indexes from the
-        # result
-        proxy.index = pd.MultiIndex.from_arrays(
-            index_to_arrays(self._ungrouped.proxy().index) +
-            index_to_arrays(proxy.index),
-            names=self._ungrouped.proxy().index.names + proxy.index.names)
+        def add_key_index(key, df):
+          # df is a dataframe or Series representing the result of func for
+          # a single key
+          df.index = pd.MultiIndex.from_arrays(
+              [[key] * len(df)] + index_to_arrays(df.index),
+              names=new_index_names + df.index.names)
+          return df
+
+
+      do_apply = lambda gb: pd.concat([
+          add_key_index(k, func(gb.get_group(k), *args, **kwargs))
+          for k in gb.groups.keys()])
+    elif isinstance(result, pd.Series):
+      if isinstance(fn_input, pd.DataFrame):
+        # DataFrameGroupBy
+        dtype = pd.Series([result]).dtype
+        proxy = pd.DataFrame(columns=result.index,
+                             dtype=result.dtype,
+                             index=self._ungrouped.proxy().index)
+      elif isinstance(fn_input, pd.Series):
+        # SeriesGroupBy
+        proxy = pd.Series(dtype=result.dtype,
+                          name=result.name,
+                          index=index_to_arrays(self._ungrouped.proxy().index) 
+
+                                index_to_arrays(result[:0].index))

Review comment:
       I added some comments to explain both of these cases (and examples in 
the next comment). Does that help?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 719775)
    Time Spent: 6.5h  (was: 6h 20m)

> Support pandas 1.4.0 in the DataFrame API
> -----------------------------------------
>
>                 Key: BEAM-13605
>                 URL: https://issues.apache.org/jira/browse/BEAM-13605
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-dataframe
>            Reporter: Brian Hulette
>            Assignee: Andy Ye
>            Priority: P2
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> 1.4.0rc1 is out now, we should verify it works with the DataFrame API, then 
> increase the version range to allow it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to