[ 
https://issues.apache.org/jira/browse/BEAM-6145?focusedWorklogId=170576&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170576
 ]

ASF GitHub Bot logged work on BEAM-6145:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Nov/18 06:24
            Start Date: 29/Nov/18 06:24
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #7156: [BEAM-6145] 
Revert "Merge pull request #7051 from markflyhigh/py-move-off-apitools-1"
URL: https://github.com/apache/beam/pull/7156
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 161b276ef5b1..18bb762e2acd 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -24,6 +24,7 @@
 from builtins import object
 import codecs
 import getpass
+import httplib2
 import json
 import logging
 import os
@@ -33,7 +34,6 @@
 import time
 from datetime import datetime
 import io
-import httplib2
 
 from past.builtins import unicode
 
@@ -59,15 +59,6 @@
 from apache_beam.transforms.display import DisplayData
 from apache_beam.utils import retry
 
-# Protect against environments where google storage library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from google.cloud import storage as gcloud_storage
-  from google.cloud.exceptions import GoogleCloudError
-except ImportError:
-  gcloud_storage = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
 # Environment version information. It is passed to the service during a
 # a job submission and is used by the service to establish what features
 # are expected by the workers.
@@ -474,7 +465,8 @@ def _stage_resources(self, options):
         staging_location=google_cloud_options.staging_location)
     return resources
 
-  def stage_file(self, gcs_or_local_path, file_name, stream):
+  def stage_file(self, gcs_or_local_path, file_name, stream,
+                 mime_type='application/octet-stream'):
     """Stages a file at a GCS or local path with stream-supplied contents."""
     if not gcs_or_local_path.startswith('gs://'):
       local_path = FileSystems.join(gcs_or_local_path, file_name)
@@ -483,25 +475,27 @@ def stage_file(self, gcs_or_local_path, file_name, 
stream):
         f.write(stream.read())
       return
     gcs_location = FileSystems.join(gcs_or_local_path, file_name)
-    bucket_name, file_name = gcs_location[5:].split('/', 1)
+    bucket, name = gcs_location[5:].split('/', 1)
 
-    client = gcloud_storage.Client(project=self.google_cloud_options.project)
-    blob = client.get_bucket(bucket_name).blob(file_name)
+    request = storage.StorageObjectsInsertRequest(
+        bucket=bucket, name=name)
     logging.info('Starting GCS upload to %s...', gcs_location)
+    upload = storage.Upload(stream, mime_type)
     try:
-      blob.upload_from_file(stream)
-    except GoogleCloudError as e:
+      response = self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
       reportable_errors = {
           403: 'access denied',
           404: 'bucket not found',
       }
-      if e.code in reportable_errors:
+      if e.status_code in reportable_errors:
         raise IOError(('Could not upload to GCS path %s: %s. Please verify '
                        'that credentials are valid and that you have write '
                        'access to the specified path.') %
-                      (gcs_or_local_path, reportable_errors[e.code]))
+                      (gcs_or_local_path, reportable_errors[e.status_code]))
       raise
     logging.info('Completed GCS upload to %s', gcs_location)
+    return response
 
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def create_job(self, job):
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index b0b9ab87bc1a..a4aec5199c98 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -142,7 +142,6 @@ def get_version():
     'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
     'googledatastore>=7.0.1,<7.1; python_version < "3.0"',
     'google-cloud-pubsub==0.35.4',
-    'google-cloud-storage==1.13.0',
     # GCP packages required by tests
     'google-cloud-bigquery>=1.6.0,<1.7.0',
 ]


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 170576)
    Time Spent: 50m  (was: 40m)

> PR 7051 causes user pipeline to break due to new version of google-cloud-core
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-6145
>                 URL: https://issues.apache.org/jira/browse/BEAM-6145
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.9.0
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Blocker
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/7051] introduced a new dependency, which 
> caused a user pipeline to become incompatible with the new version of 
> google-cloud-core.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to