[ 
https://issues.apache.org/jira/browse/BEAM-3883?focusedWorklogId=104863&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104863
 ]

ASF GitHub Bot logged work on BEAM-3883:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/May/18 23:41
            Start Date: 22/May/18 23:41
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5273: [BEAM-3883] Adding 
Client to push artifacts to artifact staging service
URL: https://github.com/apache/beam/pull/5273
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/portability/portable_stager.py 
b/sdks/python/apache_beam/runners/portability/portable_stager.py
new file mode 100644
index 00000000000..7113a251f24
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/portable_stager.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""A :class:`FileHandler` to work with :class:`ArtifactStagingServiceStub`.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import os
+
+from apache_beam.portability.api import beam_artifact_api_pb2
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.runners.portability.stager import Stager
+
+
+class PortableStager(Stager):
+  """An implementation of :class:`Stager` to stage files on
+  ArtifactStagingService.
+
+  The class keeps track of pushed files and commit manifest once all files are
+  uploaded.
+
+  Note: This class is not thread safe and user of this class should ensure
+  thread safety.
+  """
+
+  def __init__(self, artifact_service_channel):
+    """Creates a new Stager to stage file to ArtifactStagingService.
+
+    Args:
+      artifact_service_channel: Channel used to interact with
+        ArtifactStagingService.User owns the channel and should close it when
+        finished.
+    """
+    super(PortableStager, self).__init__()
+    self._artifact_staging_stub = beam_artifact_api_pb2_grpc.\
+        ArtifactStagingServiceStub(channel=artifact_service_channel)
+    self._artifacts = []
+
+  def stage_artifact(self, local_path_to_artifact, artifact_name):
+    """Stage a file to ArtifactStagingService.
+
+    Args:
+      local_path_to_artifact: Path of file to be uploaded.
+      artifact_name: File name on the artifact server.
+    """
+    if not os.path.isfile(local_path_to_artifact):
+      raise ValueError(
+          'Cannot stage {0} to artifact server. Only local files can be 
staged.'
+          .format(local_path_to_artifact))
+
+    def artifact_request_generator():
+      metadata = beam_artifact_api_pb2.ArtifactMetadata(name=artifact_name)
+      request = beam_artifact_api_pb2.PutArtifactRequest(metadata=metadata)
+      yield request
+      with open(local_path_to_artifact, 'rb') as f:
+        while True:
+          chunk = f.read(1 << 21)  # 2MB
+          if not chunk:
+            break
+          request = beam_artifact_api_pb2.PutArtifactRequest(
+              data=beam_artifact_api_pb2.ArtifactChunk(data=chunk))
+          yield request
+      self._artifacts.append(metadata)
+
+    self._artifact_staging_stub.PutArtifact(artifact_request_generator())
+
+  def commit_manifest(self):
+    manifest = beam_artifact_api_pb2.Manifest(artifact=self._artifacts)
+    self._artifacts = []
+    self._artifact_staging_stub.CommitManifest(
+        beam_artifact_api_pb2.CommitManifestRequest(manifest=manifest))
diff --git 
a/sdks/python/apache_beam/runners/portability/portable_stager_test.py 
b/sdks/python/apache_beam/runners/portability/portable_stager_test.py
new file mode 100644
index 00000000000..181007de5f0
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/portable_stager_test.py
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Test cases for :module:`artifact_service_client`."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import filecmp
+import logging
+import os
+import random
+import shutil
+import string
+import tempfile
+import unittest
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_artifact_api_pb2
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.runners.portability import portable_stager
+
+
+class PortableStagerTest(unittest.TestCase):
+
+  def setUp(self):
+    self._temp_dir = tempfile.mkdtemp()
+    self._remote_dir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    if self._temp_dir:
+      shutil.rmtree(self._temp_dir)
+    if self._remote_dir:
+      shutil.rmtree(self._remote_dir)
+
+  def _stage_files(self, files):
+    """Utility method to stage files.
+
+      Args:
+        files: a list of tuples of the form [(local_name, remote_name),...]
+          describing the name of the artifacts in local temp folder and desired
+          name in staging location.
+    """
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    staging_service = TestLocalFileSystemArtifactStagingServiceServicer(
+        self._remote_dir)
+    beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
+        staging_service, server)
+    test_port = server.add_insecure_port('[::]:0')
+    server.start()
+    stager = portable_stager.PortableStager(
+        grpc.insecure_channel('localhost:%s' % test_port))
+    for from_file, to_file in files:
+      stager.stage_artifact(
+          local_path_to_artifact=os.path.join(self._temp_dir, from_file),
+          artifact_name=to_file)
+    stager.commit_manifest()
+    return staging_service.manifest.artifact
+
+  def test_stage_single_file(self):
+    from_file = 'test_local.txt'
+    to_file = 'test_remote.txt'
+
+    with open(os.path.join(self._temp_dir, from_file), 'wb') as f:
+      f.write(b'abc')
+
+    copied_files = self._stage_files([('test_local.txt', 'test_remote.txt')])
+    self.assertTrue(
+        filecmp.cmp(
+            os.path.join(self._temp_dir, from_file),
+            os.path.join(self._remote_dir, to_file)))
+    self.assertEqual(
+        [to_file],
+        [staged_file_metadata.name for staged_file_metadata in copied_files])
+
+  def test_stage_multiple_files(self):
+
+    files = [
+        ('test_local_100.txt', 'test_remote_100.txt', 100, 's'),  #
+        ('test_local_100.binary', 'test_remote_100.binary', 100, 'b'),  #
+        ('test_local_1k.txt', 'test_remote_1k.txt', 1 << 10, 's'),  #
+        ('test_local_1k.binary', 'test_remote_1k.binary', 1 << 10, 'b'),  #
+        ('test_local_1m.txt', 'test_remote_1m.txt', 1 << 20, 's'),
+        ('test_local_1m.binary', 'test_remote_1m.binary', 1 << 20, 'b'),
+        ('test_local_10m.txt', 'test_remote_10m.txt', 10 * (1 << 20), 's'),
+        ('test_local_10m.binary', 'test_remote_10m.binary', 10 * (1 << 20), 
'b')
+    ]
+
+    for (from_file, _, size, type) in files:
+      chars = list(string.printable)
+      random.shuffle(chars)
+      chars = list(int(size / len(chars)) * chars + chars[0:size % len(chars)])
+      if type == 's':
+        with open(
+            os.path.join(self._temp_dir, from_file), 'w',
+            buffering=2 << 22) as f:
+          f.write(''.join(chars))
+      if type == 'b':
+        with open(
+            os.path.join(self._temp_dir, from_file), 'wb',
+            buffering=2 << 22) as f:
+          f.write(''.join(chars))
+
+    copied_files = self._stage_files(
+        [(from_file, to_file) for (from_file, to_file, _, _) in files])
+
+    for from_file, to_file, _, _ in files:
+      from_file = os.path.join(self._temp_dir, from_file)
+      to_file = os.path.join(self._remote_dir, to_file)
+      self.assertTrue(
+          filecmp.cmp(from_file, to_file),
+          'Local file {0} and remote file {1} are not the same.'.format(
+              from_file, to_file))
+    self.assertEqual([to_file for _, to_file, _, _ in files].sort(), [
+        staged_file_metadata.name for staged_file_metadata in copied_files
+    ].sort())
+
+
+class TestLocalFileSystemArtifactStagingServiceServicer(
+    beam_artifact_api_pb2_grpc.ArtifactStagingServiceServicer):
+
+  def __init__(self, temp_dir):
+    super(TestLocalFileSystemArtifactStagingServiceServicer, self).__init__()
+    self.temp_dir = temp_dir
+    self.manifest = None
+
+  def PutArtifact(self, request_iterator, context):
+    first = True
+    file_name = None
+    for request in request_iterator:
+      if first:
+        first = False
+        file_name = request.metadata.name
+      else:
+        with open(os.path.join(self.temp_dir, file_name), 'ab') as f:
+          f.write(request.data.data)
+
+    return beam_artifact_api_pb2.PutArtifactResponse()
+
+  def CommitManifest(self, request, context):
+    self.manifest = request.manifest
+    return beam_artifact_api_pb2.CommitManifestResponse(staging_token='token')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104863)
    Time Spent: 19h 10m  (was: 19h)

> Python SDK stages artifacts when talking to job server
> ------------------------------------------------------
>
>                 Key: BEAM-3883
>                 URL: https://issues.apache.org/jira/browse/BEAM-3883
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 19h 10m
>  Remaining Estimate: 0h
>
> The Python SDK does not currently stage its user-defined functions or 
> dependencies when talking to the job API. Artifacts that need to be staged 
> include the user code itself, any SDK components not included in the 
> container image, and the list of Python packages that must be installed at 
> runtime.
>  
> Artifacts that are currently expected can be found in the harness boot code: 
> [https://github.com/apache/beam/blob/58e3b06bee7378d2d8db1c8dd534b415864f63e1/sdks/python/container/boot.go#L52.]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to