TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r501970588



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Discussed this offline. I was misunderstanding the code, I thought this 
was summing the memory usage across _partition keys_, but `for tag, parts` is 
actually aggregating across the tagged inputs. The partition keys are ignored 
(the `_` on line 424), because they're no longer needed. At this point they've 
served their purpose of distributing data across workers, so its fine to merge 
across them.
   
   It may make sense to set the target size to 
`TARGET_PARTITION_SIZE*num_inputs`, but I'm fine without it.

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: 
collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       I'm curious if it's always beneficial to use `pd.concat` here. I was 
under the impression that it copies and re-arranges buffers into columns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to