robertwb commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r464679106



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -238,6 +247,7 @@ def __init__(self,
     self.execution_context = None  # type: Optional[ExecutionContext]
     self.consumers = collections.defaultdict(
         list)  # type: DefaultDict[int, List[Operation]]
+    self.producer = None

Review comment:
       Where is this used? 

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1340,6 +1342,17 @@ def process_outputs(
         self.per_element_output_counter.add_input(0)
       return
 
+    if isinstance(results, (dict, str, unicode, bytes)):
+      results_type = type(results).__name__
+      raise TypeCheckError(
+          'Returning a %s from a ParDo or FlatMap is '
+          'discouraged. Please use list("%s") if you really '
+          'want this behavior.' % (results_type, results))
+    elif not isinstance(results, collections.Iterable):

Review comment:
       This check is slow. Let's guard this or remove it (as it will still fail 
below).

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -188,7 +188,8 @@ def __init__(self,
             self.name_context.step_name,
             0,
             next(iter(itervalues(consumers))),
-            self.windowed_coder)
+            self.windowed_coder,
+            self)

Review comment:
       Generally it's preferable not to pass something big like `self`, but 
rather just the subset of information that's required here (e.g. the dict of 
type hint sources to type hints). This will also be needed for the cases that 
different outputs have different types. 

##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +268,71 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo):
+      if not self._in_combine:
+        transform.fn._full_label = applied_transform.full_label
+        self.store_type_hints(transform)
+
+  def store_type_hints(self, transform):
+    type_hints = transform.get_type_hints()
+
+    input_types = None
+    if type_hints.input_types:
+      normal_hints, kwarg_hints = type_hints.input_types
+
+      if kwarg_hints:
+        input_types = kwarg_hints
+      if normal_hints:
+        input_types = normal_hints
+
+    output_types = None
+    if type_hints.output_types:
+      normal_hints, kwarg_hints = type_hints.output_types
+
+      if kwarg_hints:
+        output_types = kwarg_hints
+      if normal_hints:
+        output_types = normal_hints
+
+    try:
+      argspec = inspect.getfullargspec(transform.fn._process_argspec_fn())
+      if len(argspec.args):
+        arg_index = 0
+        if argspec.args[0] == 'self':
+          arg_index = 1
+        transform.fn._runtime_parameter_name = argspec.args[arg_index]
+        if isinstance(input_types, dict):
+          input_types = (input_types[argspec.args[arg_index]], )
+    except TypeError:
+      pass
+
+    if input_types and len(input_types):
+      input_types = input_types[0]
+
+    if output_types and len(output_types):
+      output_types = output_types[0]
+
+    transform.fn._runtime_type_hints = type_hints._replace(
+        input_types=input_types, output_types=output_types)

Review comment:
       What if this transform has a type hint and the DoFn itself has a type 
hint? Will we check both? 
   
   I would probably create the dictionary of {type hint source string: type 
hint} right here, pre-packaged and ready to be used directly from the worker.

##########
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##########
@@ -202,6 +209,37 @@ def __init__(
     self._sample_counter = 0
     self._next_sample = 0
 
+    self.producer_type_hints = None
+    self.producer_full_label = None
+    self.producer_parameter_name = None
+
+    if producer and hasattr(producer, 'spec') and hasattr(producer.spec,

Review comment:
       This logic belongs in the ParDoOperator (which I think already has a 
deserialized fn in hand). 

##########
File path: website/www/site/content/en/documentation/sdks/python-type-safety.md
##########
@@ -210,7 +210,21 @@ However, if you enable runtime type checking, the code is 
guaranteed to fail at
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" 
type_hints_runtime_on >}}
 {{< /highlight >}}
 
-Note that because runtime type checks are done for each `PCollection` element, 
enabling this feature may incur a significant performance penalty. It is 
therefore recommended that runtime type checks are disabled for production 
pipelines.
+Note that because runtime type checks are done for each `PCollection` element, 
enabling this feature may incur a significant performance penalty. It is 
therefore recommended that runtime type checks are disabled for production 
pipelines. See the following section for a quicker, production-friendly 
alternative.
+
+### Faster Runtime Type Checking
+You can enable faster, sampling-based runtime type checking by setting the 
pipeline option `performance_runtime_type_check` to `True`.
+
+The is a Python 3 only feature that works by runtime type checking a small 
subset of values, called a sample, using optimized Cython code.

Review comment:
       Why is this Python 3 only? (Not that Python 2 is going to be around for 
long...)

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -466,6 +466,13 @@ def _add_argparse_args(cls, parser):
         help='Enable type checking at pipeline execution '
         'time. NOTE: only supported with the '
         'DirectRunner')
+    parser.add_argument(
+        '--performance_runtime_type_check',
+        default=False,
+        action='store_true',
+        help='Enable faster type checking via sampling at pipeline execution '
+        'time. NOTE: only supported with the '

Review comment:
       This should say "only supported with portable runners (including the 
direct runner)."

##########
File path: website/www/site/content/en/documentation/sdks/python-type-safety.md
##########
@@ -210,7 +210,21 @@ However, if you enable runtime type checking, the code is 
guaranteed to fail at
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" 
type_hints_runtime_on >}}
 {{< /highlight >}}
 
-Note that because runtime type checks are done for each `PCollection` element, 
enabling this feature may incur a significant performance penalty. It is 
therefore recommended that runtime type checks are disabled for production 
pipelines.
+Note that because runtime type checks are done for each `PCollection` element, 
enabling this feature may incur a significant performance penalty. It is 
therefore recommended that runtime type checks are disabled for production 
pipelines. See the following section for a quicker, production-friendly 
alternative.
+
+### Faster Runtime Type Checking
+You can enable faster, sampling-based runtime type checking by setting the 
pipeline option `performance_runtime_type_check` to `True`.
+
+The is a Python 3 only feature that works by runtime type checking a small 
subset of values, called a sample, using optimized Cython code.
+
+Currently, this feature does not support runtime type checking for side inputs 
or combine operations. Specifically, this feature will not runtime type check 
the following transforms:

Review comment:
       Rather than enumerating the transforms here, I would just say that it 
doesn't run on combining operations. 

##########
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##########
@@ -202,6 +209,37 @@ def __init__(
     self._sample_counter = 0
     self._next_sample = 0
 
+    self.producer_type_hints = None
+    self.producer_full_label = None
+    self.producer_parameter_name = None
+
+    if producer and hasattr(producer, 'spec') and hasattr(producer.spec,
+                                                          'serialized_fn'):
+      fns = pickler.loads(producer.spec.serialized_fn)
+      if fns:
+        if hasattr(fns[0], '_runtime_type_hints'):
+          self.producer_type_hints = fns[0]._runtime_type_hints
+        if hasattr(fns[0], '_full_label'):
+          self.producer_full_label = fns[0]._full_label
+        if hasattr(fns[0], '_runtime_parameter_name'):
+          self.producer_parameter_name = fns[0]._runtime_parameter_name
+
+    self.consumer_type_hints = []
+    self.consumer_full_labels = []
+    self.consumer_parameter_names = []
+
+    if consumers:

Review comment:
       Move this logic into the visitor, rather than doing graph inspection on 
the workers. This will also allow us to consolidate all type checks into a 
single dict, rather than having redundant code for producer/consumer. 

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -520,10 +520,26 @@ def run(self, test_runner_api='AUTO'):
             self._options,
             allow_proto_holders=True).run(False)
 
+      if (self._options.view_as(TypeOptions).runtime_type_check and
+          self._options.view_as(TypeOptions).performance_runtime_type_check):
+        raise RuntimeError(

Review comment:
       Why not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to