aaltay commented on a change in pull request #14108:
URL: https://github.com/apache/beam/pull/14108#discussion_r584020349



##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1511,3 +1512,41 @@ def windows(self):
       raise AttributeError('windows not accessible in this context')
     else:
       return self.windowed_value.windows
+
+
+def group_by_key_input_visitor(deterministic_key_coders):
+  # Importing here to avoid a circular dependency
+  from apache_beam.pipeline import PipelineVisitor
+
+  class GroupByKeyInputVisitor(PipelineVisitor):
+    """A visitor that replaces `Any` element type for input `PCollection` of
+    a `GroupByKey` with a `KV` type.
+
+    TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,

Review comment:
       nit: s/SDk/SDK

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1511,3 +1512,41 @@ def windows(self):
       raise AttributeError('windows not accessible in this context')
     else:
       return self.windowed_value.windows
+
+
+def group_by_key_input_visitor(deterministic_key_coders):
+  # Importing here to avoid a circular dependency
+  from apache_beam.pipeline import PipelineVisitor
+
+  class GroupByKeyInputVisitor(PipelineVisitor):
+    """A visitor that replaces `Any` element type for input `PCollection` of
+    a `GroupByKey` with a `KV` type.
+
+    TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
+    we could directly replace the coder instead of mutating the element type.
+    """
+    def __init__(self, deterministic_key_coders=True):
+      self.deterministic_key_coders = deterministic_key_coders
+
+    def enter_composite_transform(self, transform_node):
+      self.visit_transform(transform_node)
+
+    def visit_transform(self, transform_node):
+      # Imported here to avoid circular dependencies.
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apache_beam.transforms.core import GroupByKey

Review comment:
       Why not at L1519 instead?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -179,6 +178,13 @@ def run_pipeline(self,
       self._default_environment = environments.SubprocessSDKEnvironment(
           command_string=command_string)
 
+    if running_mode == 'in_memory' and self._num_workers != 1:
+      _LOGGER.warning(
+          'If direct_num_workers is not equal to 1, direct_running_mode '

Review comment:
       Maybe also log running_mode and self._num_workers as part of the message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to