[ 
https://issues.apache.org/jira/browse/BEAM-5788?focusedWorklogId=169556&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169556
 ]

ASF GitHub Bot logged work on BEAM-5788:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Nov/18 23:23
            Start Date: 26/Nov/18 23:23
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #7010: [BEAM-5788] Fix 
DataflowRunner in Python3 - response encoding
URL: https://github.com/apache/beam/pull/7010
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index bd6d96f3dd3d..2ca711024945 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -28,6 +28,7 @@
 import multiprocessing
 import os
 import re
+import sys
 import threading
 import time
 import traceback
@@ -173,7 +174,8 @@ def __new__(cls, storage_client=None):
         storage_client = storage.StorageV1(
             credentials=credentials,
             get_credentials=False,
-            http=get_new_http())
+            http=get_new_http(),
+            response_encoding=None if sys.version_info[0] < 3 else 'utf8')
         local_state.gcsio_instance = super(GcsIO, cls).__new__(cls)
         local_state.gcsio_instance.client = storage_client
       return local_state.gcsio_instance
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index cd98bea50c19..161b276ef5b1 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -28,6 +28,7 @@
 import logging
 import os
 import re
+import sys
 import tempfile
 import time
 from datetime import datetime
@@ -443,12 +444,14 @@ def __init__(self, options):
         url=self.google_cloud_options.dataflow_endpoint,
         credentials=credentials,
         get_credentials=(not self.google_cloud_options.no_auth),
-        http=http_client)
+        http=http_client,
+        response_encoding=get_response_encoding())
     self._storage_client = storage.StorageV1(
         url='https://www.googleapis.com/storage/v1',
         credentials=credentials,
         get_credentials=(not self.google_cloud_options.no_auth),
-        http=http_client)
+        http=http_client,
+        response_encoding=get_response_encoding())
 
   # TODO(silviuc): Refactor so that retry logic can be applied.
   @retry.no_retries  # Using no_retries marks this as an integration point.
@@ -907,6 +910,11 @@ def get_runner_harness_container_image():
   return None
 
 
+def get_response_encoding():
+  """Encoding to use to decode HTTP response from Google APIs."""
+  return None if sys.version_info[0] < 3 else 'utf8'
+
+
 # To enable a counter on the service, add it to this dictionary.
 structured_counter_translations = {
     cy_combiners.CountCombineFn: (


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 169556)
    Time Spent: 4h 50m  (was: 4h 40m)

> wordcount_fnapi_it failed on TestDataflowRunner because of JSON string 
> decoding error
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-5788
>                 URL: https://issues.apache.org/jira/browse/BEAM-5788
>             Project: Beam
>          Issue Type: Sub-task
>          Components: test-failures
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Similar to BEAM-5785, wordcount_fnapi_it failed on Python 3 when running with 
> TestDataflowRunner. Got TypeError: the JSON object must be str, not 'bytes'. 
> This error cause infinite retry before job could submitted to service.
> More details about my env and test:
> Python version: 3.5.3
> Test: 
> apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it
> Command:
> {code}
> python setup.py nosetests \
>   --tests 
> apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it  \
>   --nocapture \
>   --nologcapture \
>   --test-pipeline-options=" \                             
>         --runner=TestDataflowRunner \
>         --project=<my_project> \
>         --staging_location=<my_staging> \
>         --temp_location=<my_temp> \
>         --output=<my_output> \                                                
>                       
>         
> --sdk_location=.../beam/sdks/python/dist/apache-beam-2.9.0.dev0.tar.gz \
>         --num_workers=1"
> {code}
> Stacktrace:
> {code}
> WARNING:root:Retry with exponential backoff: waiting for 7.661876827680761 
> seconds before retrying exists because we caught exception: TypeError: the 
> JSON object must be str, not 'bytes'
>  Traceback for above exception (most recent call last):
>   File ".../beam/sdks/python/apache_beam/utils/retry.py", line 184, in wrapper
>     return fun(*args, **kwargs)
>   File ".../beam/sdks/python/apache_beam/io/gcp/gcsio.py", line 375, in exists
>     self.client.objects.Get(request)  # metadata
>   File 
> ".../beam/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
>  line 955, in Get
>     download=download)
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py",
>  line 722, in _RunMethod
>     return self.ProcessHttpResponse(method_config, http_response, request)
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py",
>  line 728, in ProcessHttpResponse
>     self.__ProcessHttpResponse(method_config, http_response, request))
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py",
>  line 611, in __ProcessHttpResponse
>     response_type, http_response.content)
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py",
>  line 442, in DeserializeMessage
>     message = encoding.JsonToMessage(response_type, data)
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py",
>  line 104, in JsonToMessage
>     return _ProtoJsonApiTools.Get().decode_message(message_type, message)
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py",
>  line 290, in decode_message
>     message_type, result)
>   File 
> ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/protorpclite/protojson.py",
>  line 210, in decode_message
>     dictionary = json.loads(encoded_message)
>   File "/usr/lib/python3.5/json/__init__.py", line 312, in loads
>     s.__class__.__name__))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to