robertwb commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r468090981



##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +269,95 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo) and not self._in_combine:
+      # Store output type hints in current transform
+      transform.fn._runtime_output_constraints = {
+          applied_transform.full_label: self.get_output_type_hints(transform)
+      }
+
+      # Store input type hints in producer transform
+      producer = applied_transform.inputs[0].producer
+      if (hasattr(producer, 'transform') and
+          hasattr(producer.transform, 'fn') and
+          hasattr(producer.transform.fn, '_runtime_output_constraints')):

Review comment:
       We would want to add this regardless of the dict already being present, 
right? Perhaps with the above method that'd be simpler. 

##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +269,95 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo) and not self._in_combine:
+      # Store output type hints in current transform
+      transform.fn._runtime_output_constraints = {
+          applied_transform.full_label: self.get_output_type_hints(transform)
+      }
+
+      # Store input type hints in producer transform
+      producer = applied_transform.inputs[0].producer
+      if (hasattr(producer, 'transform') and

Review comment:
       Why could this fail? 

##########
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##########
@@ -224,8 +230,25 @@ def _observable_callback_inner(value, is_encoded=False):
 
     return _observable_callback_inner
 
+  def type_check(self, value):
+    # type: (any, bool) -> None
+    for transform_label, type_constraint_tuple in 
self.output_type_constraints.items():
+      parameter_name, constraint = type_constraint_tuple
+      if constraint is not None:

Review comment:
       Yes, please do that.

##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +269,95 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo) and not self._in_combine:
+      # Store output type hints in current transform
+      transform.fn._runtime_output_constraints = {
+          applied_transform.full_label: self.get_output_type_hints(transform)
+      }
+
+      # Store input type hints in producer transform
+      producer = applied_transform.inputs[0].producer
+      if (hasattr(producer, 'transform') and
+          hasattr(producer.transform, 'fn') and
+          hasattr(producer.transform.fn, '_runtime_output_constraints')):
+        producer = producer.transform.fn
+        producer._runtime_output_constraints[applied_transform.full_label] \

Review comment:
       Style: in Beam we avoid backslash line brakes. You can use yapf to break 
your lines for you. (If needed, add `()`'s)> 

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1340,6 +1342,17 @@ def process_outputs(
         self.per_element_output_counter.add_input(0)
       return
 
+    if isinstance(results, (dict, str, unicode, bytes)):
+      results_type = type(results).__name__
+      raise TypeCheckError(
+          'Returning a %s from a ParDo or FlatMap is '
+          'discouraged. Please use list("%s") if you really '
+          'want this behavior.' % (results_type, results))
+    elif not isinstance(results, collections.Iterable):

Review comment:
       Not yet resolved? 

##########
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##########
@@ -224,8 +230,25 @@ def _observable_callback_inner(value, is_encoded=False):
 
     return _observable_callback_inner
 
+  def type_check(self, value):
+    # type: (any, bool) -> None
+    for transform_label, type_constraint_tuple in 
self.output_type_constraints.items():
+      parameter_name, constraint = type_constraint_tuple
+      if constraint is not None:
+        try:
+          _check_instance_type(constraint, value, parameter_name, verbose=True)
+        except TypeCheckError as e:
+          if not transform_label.startswith('ParDo'):

Review comment:
       I don't think we want to mess with this. (Also, if we do, let's push 
this logic up into the visitor where we decide the label.)

##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -600,7 +627,8 @@ def _read_side_inputs(self, tags_and_types):
           self.name_context.step_name,
           view_options['coder'],
           i,
-          suffix='side-input')
+          suffix='side-input',
+          producer_type_hints=get_perf_runtime_type_hints(self))

Review comment:
       This isn't correct, these type hints should not be used for this 
counter. Make this an optional argument defaulting to None and don't pass it 
here.  

##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +269,95 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo) and not self._in_combine:
+      # Store output type hints in current transform
+      transform.fn._runtime_output_constraints = {
+          applied_transform.full_label: self.get_output_type_hints(transform)
+      }
+
+      # Store input type hints in producer transform
+      producer = applied_transform.inputs[0].producer
+      if (hasattr(producer, 'transform') and
+          hasattr(producer.transform, 'fn') and

Review comment:
       Cleaner to check isinstance(transform, core.ParDo). 
   
   Even better would be to add a method like `_add_producer_type_constraint` to 
PTransform, default implementation empty, overridden in ParDo. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to