[ 
https://issues.apache.org/jira/browse/BEAM-5792?focusedWorklogId=157167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157167
 ]

ASF GitHub Bot logged work on BEAM-5792:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Oct/18 20:14
            Start Date: 22/Oct/18 20:14
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6749: [BEAM-5792] 
Implement Create in terms of Impulse + Reshuffle.
URL: https://github.com/apache/beam/pull/6749
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index b00ba21ee28..63546f8a692 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1972,26 +1972,35 @@ def get_output_type(self):
             self.infer_output_type(None))
 
   def expand(self, pbegin):
-    from apache_beam.io import iobase
     assert isinstance(pbegin, pvalue.PBegin)
-    self.pipeline = pbegin.pipeline
-    coder = typecoders.registry.get_coder(self.get_output_type())
-    debug_options = self.pipeline._options.view_as(DebugOptions)
     # Must guard against this as some legacy runners don't implement impulse.
+    debug_options = pbegin.pipeline._options.view_as(DebugOptions)
     fn_api = (debug_options.experiments
               and 'beam_fn_api' in debug_options.experiments)
-    # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
-    # These special cases are often used in building up more complex
-    # transforms (e.g. Write).
-    if fn_api and len(self.values) == 0:
-      return pbegin | Impulse() | FlatMap(
-          lambda _: ()).with_output_types(self.get_output_type())
-    elif fn_api and len(self.values) == 1:
-      serialized_value = coder.encode(self.values[0])
-      return pbegin | Impulse() | Map(
-          lambda _: coder.decode(serialized_value)).with_output_types(
-              self.get_output_type())
+    if fn_api:
+      coder = typecoders.registry.get_coder(self.get_output_type())
+      serialized_values = [coder.encode(v) for v in self.values]
+      # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
+      # These special cases are often used in building up more complex
+      # transforms (e.g. Write).
+
+      class MaybeReshuffle(PTransform):
+        def expand(self, pcoll):
+          if len(serialized_values) > 1:
+            from apache_beam.transforms.util import Reshuffle
+            return pcoll | Reshuffle()
+          else:
+            return pcoll
+      return (
+          pbegin
+          | Impulse()
+          | FlatMap(lambda _: serialized_values)
+          | MaybeReshuffle()
+          | Map(coder.decode).with_output_types(self.get_output_type()))
     else:
+      self.pipeline = pbegin.pipeline
+      from apache_beam.io import iobase
+      coder = typecoders.registry.get_coder(self.get_output_type())
       source = self._create_source_from_iterable(self.values, coder)
       return (pbegin.pipeline
               | iobase.Read(source).with_output_types(self.get_output_type()))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 157167)
            Time Spent: 10m
    Remaining Estimate: 0h

> Use Impulse + Reshuffle for Create on Fn API
> --------------------------------------------
>
>                 Key: BEAM-5792
>                 URL: https://issues.apache.org/jira/browse/BEAM-5792
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Ahmet Altay
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's little advantage to shuffling the splits rather than shuffling the 
> elements themselves,
> and some real-world experience showed significant disadvantages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to