robertwb commented on a change in pull request #14390:
URL: https://github.com/apache/beam/pull/14390#discussion_r612043687



##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -445,6 +445,17 @@ def _add_argparse_args(cls, parser):
         action='store_true',
         help='Whether to enable streaming mode.')
 
+    parser.add_argument(
+        '--resource_hint',
+        dest='resource_hints',
+        action='append',
+        default=None,

Review comment:
       Should default be an empty iterable? 

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -1226,7 +1255,7 @@ def transform_to_runner_api(
     transform_urn = transform_spec.urn if transform_spec else None
     if (not environment_id and
         (transform_urn not in Pipeline.runner_implemented_transforms())):
-      environment_id = context.default_environment_id()
+      environment_id = context.get_environment_id_for_transform(self.transform)

Review comment:
       Let's just pass the hints here, rather than the transform itself (which 
makes it seem like the context cares about the type or other attributes of the 
transform). 

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -109,12 +113,25 @@ class Environment(object):
   _urn_to_env_cls = {}  # type: Dict[str, type]
 
   def __init__(self,
-               capabilities,  # type: Iterable[str]
-               artifacts,  # type: 
Iterable[beam_runner_api_pb2.ArtifactInformation]
-              ):
+      capabilities,  # type: Iterable[str]
+      artifacts,  # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
+      resource_hints,  # type: Optional[Mapping[str, bytes]]
+               ):
     # type: (...) -> None
     self._capabilities = capabilities
     self._artifacts = artifacts
+    self._resource_hints = dict(resource_hints) if resource_hints else {}
+
+  def __eq__(self, other):
+    return (
+        self.__class__ == other.__class__

Review comment:
       I don't think this is safe, one can have multiple, distinct docker 
environments (for example). 

##########
File path: sdks/python/apache_beam/transforms/resources.py
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A module for defining resource requirements for execution of transforms.
+
+Pipeline authors can use resource hints to provide additional information to
+runners about the desired aspects of the execution environment.
+
+Resource hints can be specified on a transform level for parts of the pipeline,
+or globally via --resource_hint pipeline option.
+
+See also: PTransforms.with_resource_hints().
+"""
+
+from typing import Any
+from typing import Callable
+from typing import Dict
+
+from apache_beam.portability.common_urns import resource_hints
+
+__all__ = ['parse_resource_hints', 'get_merged_hint_value']
+
+
+def _parse_str(value):
+  if not isinstance(value, str):
+    raise ValueError()
+  return value.encode('ascii')
+
+
+def _parse_int(value):
+  if isinstance(value, str):
+    value = int(value)
+  if not isinstance(value, int):
+    raise ValueError()
+  return str(value).encode('ascii')
+
+
+def _parse_any(_):
+  # For hints where only a key is relevant and value is set to None or any 
value
+  return b'1'
+
+
+def _parse_storage_size_str(value):  # type: (str) -> bytes
+  """Parses a human-friendly storage size string into a number of bytes.
+  """
+  if not isinstance(value, str):
+    value = str(value)
+  value = value.strip().replace(" ", "")
+  units = {
+      'PiB': 2**50,
+      'TiB': 2**40,
+      'GiB': 2**30,
+      'MiB': 2**20,
+      'KiB': 2**10,
+      'PB': 10**15,
+      'TB': 10**12,
+      'GB': 10**9,
+      'MB': 10**6,
+      'KB': 10**3,
+  }
+  multiplier = 1
+  for suffix in units:
+    if value.endswith(suffix):
+      multiplier = units[suffix]
+      value = value[:-len(suffix)]
+      break
+
+  return str(round(float(value) * multiplier)).encode('ascii')
+
+
+def _use_max(v1, v2):

Review comment:
       Did you seem my comment on making classes for this rather than having 
dicts of methods? 

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -863,6 +863,31 @@ def test_pack_combiners_enabled_by_experiment(self):
     self._test_pack_combiners(
         PipelineOptions(self.default_properties), expect_packed=True)
 
+  def test_resource_hints_translation(self):
+    runner = DataflowRunner()
+
+    self.default_properties.append('--experiments=use_legacy_bq_sink')
+    self.default_properties.append('--resource_hint=min_ram_per_vcpu=10GB')
+    with beam.Pipeline(runner=runner,
+                       options=PipelineOptions(self.default_properties)) as p:
+      # pylint: disable=expression-not-assigned
+      (
+          p
+          | beam.Create([1])
+          | 'MapWithHints' >> beam.Map(lambda x: x + 1).with_resource_hints(
+              min_ram_per_vcpu='20GB',
+              
accelerator='type:nvidia-tesla-k80;count:1;install-nvidia-drivers'
+          ))
+
+    step = self._find_step(runner.job, 'MapWithHints')
+    self.assertEqual(
+        step['properties']['resource_hints'],
+        {
+            'beam:resources:min_ram_per_vcpu_bytes:v1': '10000000000',

Review comment:
       FWIW, memory is almost always GiB, even when written GB. THis is the 
convention for GCE too, so we should probably follow that. 
https://cloud.google.com/compute/docs/instances/creating-instance-with-custom-machine-type

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -707,6 +709,17 @@ def _add_step(self, step_kind, step_label, transform_node, 
side_tags=()):
             for item in DisplayData.create_from(transform_node.transform).items
         ])
 
+    resource_hints = transform_node.transform.get_resource_hints() or {}
+    resource_hints.update(self._default_environment.resource_hints())
+    if resource_hints:
+      step.add_property(
+          PropertyNames.RESOURCE_HINTS,
+          {
+              hint: quote_from_bytes(value)
+              for hint,

Review comment:
       You can write `for (hint, value) in...` to work around yapf bug.

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -976,6 +981,28 @@ def leave_composite_transform(self, transform_node):
     pass
 
 
+class ResourceHintsPropagator(PipelineVisitor):
+  """Propagates resource hints set on composite transforms to subtransforms.
+  """
+  def visit_transform(self, transform_node):
+    # type: (AppliedPTransform) -> None
+    if (transform_node.parent is not None and
+        transform_node.parent.transform is not None and
+        transform_node.parent.transform.get_resource_hints()):
+      transform_node.transform._merge_hints_from_outer_composite(

Review comment:
       Looking over this code (here and elsewhere), it seems preferable for 
resource hints to be attached to the transform_node rather than the transform. 
We can populate the hints on the node when constructing it from a transform. 
This also solves the issue that a single Transform can be applied in multiple 
places, gives us a place to put the "outermost" ones that come from the 
pipeline options, and allows users to override `get_resource_hints` with the 
expected behavior. (It also solves the issue that PTransforms are allowed to be 
re-used, but their application in the pipeline shouldn't be.)
   

##########
File path: sdks/python/apache_beam/pipeline_test.py
##########
@@ -960,6 +963,159 @@ def annotations(self):
             transform.annotations['proto'], some_proto.SerializeToString())
     self.assertEqual(seen, 2)
 
+  def test_runner_api_roundtrip_preserves_resource_hints(self):
+    resources._KNOWN_HINTS.update(
+        {'foo_hint': lambda value: {
+            'foo_urn': value
+        }})
+
+    p = TestPipeline()

Review comment:
       FWIW, there's no need to use TestPipeline here. It's mostly for 
ValidatesRunner integration tests.

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -234,6 +257,35 @@ def from_options(cls, options):
     raise NotImplementedError
 
 
[email protected]_urn(common_urns.environments.DEFAULT.urn, None)
+class DefaultEnvironment(Environment):
+  """Used as a stub when context is missing a default environment."""
+  def __init__(

Review comment:
       You can simply omit the constructor if all it does is call super. 

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -257,8 +311,8 @@ def __init__(
         (self.container_image))
 
   def __eq__(self, other):
-    return self.__class__ == other.__class__ \
-           and self.container_image == other.container_image
+    return (
+        super().__eq__(other) and self.container_image == 
other.container_image)
 
   def __hash__(self):
     return hash((self.__class__, self.container_image))

Review comment:
       This one doesn't handle resource hints. (Maybe that's OK 'cause it'll 
just be a collision.)

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
##########
@@ -863,6 +863,31 @@ def test_pack_combiners_enabled_by_experiment(self):
     self._test_pack_combiners(
         PipelineOptions(self.default_properties), expect_packed=True)
 
+  def test_resource_hints_translation(self):
+    runner = DataflowRunner()
+
+    self.default_properties.append('--experiments=use_legacy_bq_sink')
+    self.default_properties.append('--resource_hint=min_ram_per_vcpu=10GB')

Review comment:
       I think we should merge these as well. 

##########
File path: sdks/python/apache_beam/runners/pipeline_context.py
##########
@@ -274,5 +282,44 @@ def to_runner_api(self):
     return context_proto
 
   def default_environment_id(self):
-    # type: () -> Optional[str]
+    # type: () -> str
     return self._default_environment_id
+
+  def get_environment_id_for_transform(
+      self, transform):  # type: (Optional[ptransform.PTransform]) -> str
+    """Returns an environment id where the transform can be executed."""
+    if not transform or not transform.get_resource_hints():
+      return self.default_environment_id()
+
+    def merge_resource_hints(
+        environment_id,
+        transform):  # type: (str, ptransform.PTransform) -> Dict[str, bytes]
+      # TODO: add test.
+      # Hints already defined in the environment take precedence over hints

Review comment:
       As above, we should merge not override. 

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -109,12 +113,25 @@ class Environment(object):
   _urn_to_env_cls = {}  # type: Dict[str, type]
 
   def __init__(self,
-               capabilities,  # type: Iterable[str]
-               artifacts,  # type: 
Iterable[beam_runner_api_pb2.ArtifactInformation]
-              ):
+      capabilities,  # type: Iterable[str]
+      artifacts,  # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
+      resource_hints,  # type: Optional[Mapping[str, bytes]]
+               ):
     # type: (...) -> None
     self._capabilities = capabilities
     self._artifacts = artifacts
+    self._resource_hints = dict(resource_hints) if resource_hints else {}
+
+  def __eq__(self, other):

Review comment:
       artifacts? (Capabilities is probably constant...)

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -337,18 +399,21 @@ def __init__(
       env=None,  # type: Optional[Mapping[str, str]]
       capabilities=(),  # type: Iterable[str]
       artifacts=(),  # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
+      resource_hints=None,  # type: Optional[Mapping[str, bytes]]
   ):
     # type: (...) -> None
-    super(ProcessEnvironment, self).__init__(capabilities, artifacts)
+    super(ProcessEnvironment,
+          self).__init__(capabilities, artifacts, resource_hints)
     self.command = command
     self.os = os
     self.arch = arch
     self.env = env or {}
 
   def __eq__(self, other):
-    return self.__class__ == other.__class__ \
-      and self.command == other.command and self.os == other.os \
-      and self.arch == other.arch and self.env == other.env
+    return (
+        super().__eq__(other) and self.command == other.command and

Review comment:
       (This is starting to feel very redundant. I wonder if this could be 
refactored in such a way that it wasn't so redundant...like maybe one could 
have an EnvironmentType property that abstracts away the different types of 
environments, and Environment could handle all the common stuff... maybe out of 
scope for now.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to