[
https://issues.apache.org/jira/browse/BEAM-11052?focusedWorklogId=501697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-501697
]
ASF GitHub Bot logged work on BEAM-11052:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Oct/20 22:18
Start Date: 16/Oct/20 22:18
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r506744373
##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+ def test_convert_memoization(self):
Review comment:
This test (and below) seem largely copies of the test above, but we
don't even need to run the pipeline to test what this is testing. Perhaps it'd
be better to limit it to what we're trying to test for clarity, i.e. the ids
are the same.
##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,19 +161,36 @@ def extract_input(placeholder):
placeholders = frozenset.union(
frozenset(), *[df._expr.placeholders() for df in dataframes])
- results = {p: extract_input(p)
- for p in placeholders
- } | label >> transforms._DataframeExpressionsTransform(
- dict((ix, df._expr) for ix, df in enumerate(
- dataframes))) # type: Dict[Any, pvalue.PCollection]
+
+ # Exclude any dataframes that have already been converted to PCollections.
+ # We only want to convert each DF expression once, then re-use.
+ new_dataframes = [
+ df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
+ ]
+ new_results = {p: extract_input(p)
+ for p in placeholders
+ } | label >> transforms._DataframeExpressionsTransform(
+ dict(
Review comment:
Dict comprehension (for consistency)?
##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+ def test_convert_memoization(self):
+ with beam.Pipeline() as p:
+ a = pd.Series([1, 2, 3])
+ b = pd.Series([100, 200, 300])
+
+ pc_a = p | 'A' >> beam.Create([a])
+ pc_b = p | 'B' >> beam.Create([b])
+
+ df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+ df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+ df_2a = 2 * df_a
+ df_3a = 3 * df_a
+ df_ab = df_a * df_b
+
+ # Converting multiple results at a time can be more efficient.
+ pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
+ # Converting the same expressions should yeild the same pcolls
+ pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
+
+ self.assertEqual(id(pc_2a), id(pc_2a_))
+ self.assertEqual(id(pc_ab), id(pc_ab_))
+
+ assert_that(pc_2a, equal_to(list(2 * a)), label='Check2a')
+ assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
+ assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+
+ def test_convert_memoization_yield_pandas(self):
+ with beam.Pipeline() as p:
+ a = pd.Series([1, 2, 3])
+ b = pd.Series([100, 200, 300])
+
+ pc_a = p | 'A' >> beam.Create([a])
+ pc_b = p | 'B' >> beam.Create([b])
+
+ df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+ df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+ df_2a = 2 * df_a
+ df_3a = 3 * df_a
+ df_ab = df_a * df_b
+
+ # Converting multiple results at a time can be more efficient.
+ pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab,
Review comment:
Move these tests into the former, so you can also assert that
`to_pcollection(x, yield_elements='schema') != to_pcollection(x,
yield_elements='pandas')` (i.e. no accidental cross-cache contamination).
##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+ def test_convert_memoization(self):
+ with beam.Pipeline() as p:
+ a = pd.Series([1, 2, 3])
+ b = pd.Series([100, 200, 300])
+
+ pc_a = p | 'A' >> beam.Create([a])
+ pc_b = p | 'B' >> beam.Create([b])
+
+ df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+ df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+ df_2a = 2 * df_a
+ df_3a = 3 * df_a
+ df_ab = df_a * df_b
+
+ # Converting multiple results at a time can be more efficient.
+ pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
+ # Converting the same expressions should yeild the same pcolls
+ pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
+
+ self.assertEqual(id(pc_2a), id(pc_2a_))
Review comment:
assertIs?
##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,7 +69,28 @@ def to_dataframe(
expressions.PlaceholderExpression(proxy, pcoll))
+# PCollections generated by to_pcollection are memoized.
+# WeakValueDictionary is used so the caches are cleaned up with the parent
+# pipelines
Review comment:
Nice. Perhaps it's worth noting that the pipeline (indirectly) holds
references to the transforms which keep both the collections and expressions
alive. (Keeping the expressions alive is important to ensure their ids never
get accidentally re-used.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 501697)
Time Spent: 1h 20m (was: 1h 10m)
> to_pcollection should memoize on DF expressions
> -----------------------------------------------
>
> Key: BEAM-11052
> URL: https://issues.apache.org/jira/browse/BEAM-11052
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> We should always return the same PCollection when `to_pcollection` is called
> multiple times with the same input, this will allow for better integration
> with interactive notebooks.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)