[ 
https://issues.apache.org/jira/browse/BEAM-11052?focusedWorklogId=498799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498799
 ]

ASF GitHub Bot logged work on BEAM-11052:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/20 23:41
            Start Date: 09/Oct/20 23:41
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r502713670



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,21 +142,47 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(
+                         (ix, df._expr) for ix, df in 
enumerate(new_dataframes))
+                 )  # type: Dict[Any, pvalue.PCollection]
+
+  TO_PCOLLECTION_CACHE.update(
+      {new_dataframes[ix]._expr: pc
+       for ix, pc in new_results.items()})
+
+  raw_results = {
+      ix: TO_PCOLLECTION_CACHE[df._expr]
+      for ix, df in enumerate(dataframes)
+  }
 
   if yield_elements == "schemas":
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
-        for (key, pc) in results.items()
+        ix: _make_unbatched_pcoll(pc, dataframes[ix]._expr, include_indexes)
+        for ix,
+        pc in raw_results.items()
     }
+  else:
+    results = raw_results
 
   if len(results) == 1 and not always_return_tuple:
     return results[0]
   else:
     return tuple(value for key, value in sorted(results.items()))
+
+
+memoize = functools.lru_cache(maxsize=None)
+
+
+@memoize

Review comment:
       Again, this grows without bound.

##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,21 +142,47 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(
+                         (ix, df._expr) for ix, df in 
enumerate(new_dataframes))
+                 )  # type: Dict[Any, pvalue.PCollection]
+
+  TO_PCOLLECTION_CACHE.update(
+      {new_dataframes[ix]._expr: pc
+       for ix, pc in new_results.items()})
+
+  raw_results = {
+      ix: TO_PCOLLECTION_CACHE[df._expr]
+      for ix, df in enumerate(dataframes)
+  }
 
   if yield_elements == "schemas":
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
-        for (key, pc) in results.items()
+        ix: _make_unbatched_pcoll(pc, dataframes[ix]._expr, include_indexes)
+        for ix,

Review comment:
       Put ()'s around `ix, pc` for better formatting.

##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,6 +68,9 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+TO_PCOLLECTION_CACHE = {}

Review comment:
       I'm actually more worried about global caches in production than 
testing--the expressions themselves should not collide between pipelines. 
   
   Maybe we could use a weakref.WeakKeyDictionary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 498799)
    Time Spent: 0.5h  (was: 20m)

> to_pcollection should memoize on DF expressions
> -----------------------------------------------
>
>                 Key: BEAM-11052
>                 URL: https://issues.apache.org/jira/browse/BEAM-11052
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> We should always return the same PCollection when `to_pcollection` is called 
> multiple times with the same input, this will allow for better integration 
> with interactive notebooks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to