robertwb commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r502090932



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | 
beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       Not quite. We want the size of the inputs to partition the inputs; we 
don't care about the output size of this stage at all. Clarified in the comment 
above. 
   
   Note also that these "stages" are not the same as fused executable "stages." 
In particular, these "stages" contain a (Co)GBK along with some operations that 
proceed it. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Having thought about it, I'm going to leave it for now. The reason to 
have a bound is to bound the total amount of memory on a worker (and amount of 
compute to couple in downstream operations), and that makes sense to cap across 
inputs rather than per-input. We could revisit in the future. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.

Review comment:
       Mostly the observation that there were a plethora of tiny/empty 
dataframes when debugging, and the realization that this could be worse given 
the dynamic partitioning choices (which err on the side of overestimation). 
   
   I also ran some simple benchmarks and determined that, for simple 
operations, things started to become linear in around the MB range. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: 
collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       How beneficial it is depends on the size of the inputs. For tiny inputs, 
the cost of copying is easily absorbed in the overhead savings of future 
operations, but for large inputs it could dominate. It may make sense to try to 
tune TARGET_PARTITION_SIZE to find the right tradeoff. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to