[ 
https://issues.apache.org/jira/browse/BEAM-3418?focusedWorklogId=83388&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83388
 ]

ASF GitHub Bot logged work on BEAM-3418:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/18 23:45
            Start Date: 22/Mar/18 23:45
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #4587: [BEAM-3418] Send 
worker_id in all grpc channels to runner harness
URL: https://github.com/apache/beam/pull/4587
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e503da9eb5b..9db1cab96f4 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -169,10 +169,17 @@ def __init__(self, packages, options, 
environment_version, pipeline_url):
     if job_type.startswith('FNAPI_'):
       runner_harness_override = (
           dependency.get_runner_harness_container_image())
+      self.debug_options.experiments = self.debug_options.experiments or []
       if runner_harness_override:
-        self.debug_options.experiments = self.debug_options.experiments or []
         self.debug_options.experiments.append(
             'runner_harness_container_image=' + runner_harness_override)
+      # Add use_multiple_sdk_containers flag if its not already present. Do not
+      # add the flag if 'no_use_multiple_sdk_containers' is present.
+      # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
+      # till version 2.4.
+      if ('use_multiple_sdk_containers' not in self.proto.experiments and
+          'no_use_multiple_sdk_containers' not in self.proto.experiments):
+        self.debug_options.experiments.append('use_multiple_sdk_containers')
     # Experiments
     if self.debug_options.experiments:
       for experiment in self.debug_options.experiments:
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index f554646c659..7c79c4cc7a7 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -34,6 +34,7 @@
 from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -311,6 +312,9 @@ def create_data_channel(self, remote_grpc_port):
               # controlled in a layer above.
               options=[("grpc.max_receive_message_length", -1),
                        ("grpc.max_send_message_length", -1)])
+          # Add workerId to the grpc channel
+          grpc_channel = grpc.intercept_channel(grpc_channel,
+                                                WorkerIdInterceptor())
           self._data_channel_cache[url] = GrpcClientDataChannel(
               beam_fn_api_pb2_grpc.BeamFnDataStub(grpc_channel))
     return self._data_channel_cache[url]
diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py 
b/sdks/python/apache_beam/runners/worker/log_handler.py
index 6d8a1d92671..152659e0a3f 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -25,6 +25,7 @@
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -48,7 +49,9 @@ class FnApiLogRecordHandler(logging.Handler):
 
   def __init__(self, log_service_descriptor):
     super(FnApiLogRecordHandler, self).__init__()
-    self._log_channel = grpc.insecure_channel(log_service_descriptor.url)
+    self._log_channel = grpc.intercept_channel(
+        grpc.insecure_channel(log_service_descriptor.url),
+        WorkerIdInterceptor())
     self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub(
         self._log_channel)
     self._log_entry_queue = queue.Queue()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 1988490013c..c77659b3479 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -34,6 +34,7 @@
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import bundle_processor
 from apache_beam.runners.worker import data_plane
+from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
 
 
 class SdkHarness(object):
@@ -42,7 +43,8 @@ class SdkHarness(object):
   def __init__(self, control_address, worker_count):
     self._worker_count = worker_count
     self._worker_index = 0
-    self._control_channel = grpc.insecure_channel(control_address)
+    self._control_channel = grpc.intercept_channel(
+        grpc.insecure_channel(control_address), WorkerIdInterceptor())
     self._data_channel_factory = data_plane.GrpcClientDataChannelFactory()
     self.workers = queue.Queue()
     # one thread is enough for getting the progress report.
diff --git a/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py 
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
new file mode 100644
index 00000000000..0a71292f773
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Client Interceptor to inject worker_id"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import collections
+import os
+import uuid
+
+import grpc
+
+
+class _ClientCallDetails(
+    collections.namedtuple('_ClientCallDetails',
+                           ('method', 'timeout', 'metadata', 'credentials')),
+    grpc.ClientCallDetails):
+  pass
+
+
+class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):
+
+  # TODO: (BEAM-3904) Removed defaulting to UUID when worker_id is not present
+  # and throw exception in worker_id_interceptor.py after we have rolled out
+  # the corresponding container changes.
+  # Unique worker Id for this worker.
+  _worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
+      'WORKER_ID') else str(uuid.uuid4())
+
+  def __init__(self):
+    pass
+
+  def intercept_stream_stream(self, continuation, client_call_details,
+                              request_iterator):
+    metadata = []
+    if client_call_details.metadata is not None:
+      metadata = list(client_call_details.metadata)
+    if 'worker_id' in metadata:
+      raise RuntimeError('Header metadata alreay have worker_id.')
+    metadata.append(('worker_id', self._worker_id))
+    new_client_details = _ClientCallDetails(
+        client_call_details.method, client_call_details.timeout, metadata,
+        client_call_details.credentials)
+    return continuation(new_client_details, request_iterator)
diff --git 
a/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py 
b/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
new file mode 100644
index 00000000000..411e3099d19
--- /dev/null
+++ b/sdks/python/apache_beam/runners/worker/worker_id_interceptor_test.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Test for WorkerIdInterceptor"""
+from __future__ import absolute_import
+
+import collections
+import logging
+import unittest
+
+import grpc
+
+from apache_beam.runners.worker.worker_id_interceptor import 
WorkerIdInterceptor
+
+
+class _ClientCallDetails(
+    collections.namedtuple('_ClientCallDetails',
+                           ('method', 'timeout', 'metadata', 'credentials')),
+    grpc.ClientCallDetails):
+  pass
+
+
+class WorkerIdInterceptorTest(unittest.TestCase):
+
+  def test_worker_id_insertion(self):
+    worker_id_key = 'worker_id'
+    headers_holder = {}
+
+    def continuation(client_details, request_iterator):
+      headers_holder.update({
+          worker_id_key: dict(client_details.metadata).get(worker_id_key)
+      })
+
+    WorkerIdInterceptor._worker_id = 'my_worker_id'
+
+    WorkerIdInterceptor().intercept_stream_stream(continuation,
+                                                  _ClientCallDetails(
+                                                      None, None, None, None),
+                                                  [])
+    self.assertEqual(headers_holder[worker_id_key], 'my_worker_id',
+                     'worker_id_key not set')
+
+  def test_failure_when_worker_id_exists(self):
+    worker_id_key = 'worker_id'
+    headers_holder = {}
+
+    def continuation(client_details, request_iterator):
+      headers_holder.update({
+          worker_id_key: dict(client_details.metadata).get(worker_id_key)
+      })
+
+    WorkerIdInterceptor._worker_id = 'my_worker_id'
+
+    with self.assertRaises(RuntimeError):
+      WorkerIdInterceptor().intercept_stream_stream(
+          continuation, _ClientCallDetails(None, None, {'worker_id': '1'},
+                                           None), [])
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 69f031b7631..14e261947bb 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -105,6 +105,7 @@ func main() {
 
        // (3) Invoke python
 
+       os.Setenv("WORKER_ID", *id)
        os.Setenv("PIPELINE_OPTIONS", options)
        os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
        os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(&pbpipeline.ApiServiceDescriptor{Url: 
*loggingEndpoint}))
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 5ed623b2293..02c02f56c42 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -99,7 +99,7 @@ def get_version():
     'avro>=1.8.1,<2.0.0',
     'crcmod>=1.7,<2.0',
     'dill==0.2.6',
-    'grpcio>=1.0,<2',
+    'grpcio>=1.8,<2',
     'hdfs>=2.1.0,<3.0.0',
     'httplib2>=0.8,<0.10',
     'mock>=1.0.1,<3.0.0',


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83388)
    Time Spent: 4h  (was: 3h 50m)

> Python Fnapi - Support Multiple SDK workers on a single VM
> ----------------------------------------------------------
>
>                 Key: BEAM-3418
>                 URL: https://issues.apache.org/jira/browse/BEAM-3418
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-harness
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>              Labels: performance, portability
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> Support multiple python SDK process on a VM to fully utilize a machine.
> Each SDK Process will work in isolation and interact with Runner HarnessĀ 
> independently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to