[ 
https://issues.apache.org/jira/browse/BEAM-6033?focusedWorklogId=174332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174332
 ]

ASF GitHub Bot logged work on BEAM-6033:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Dec/18 22:44
            Start Date: 11/Dec/18 22:44
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #7032: [BEAM-6033] 
normalize httplib2.Http initialization and usage
URL: https://github.com/apache/beam/pull/7032
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/internal/http_client.py 
b/sdks/python/apache_beam/internal/http_client.py
new file mode 100644
index 000000000000..d9e4f7233794
--- /dev/null
+++ b/sdks/python/apache_beam/internal/http_client.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility functions used for creating a common Http client from httplib2.
+
+For internal use only. No backwards compatibility guarantees.
+"""
+from __future__ import absolute_import
+
+import logging
+import os
+import re
+
+import httplib2
+
+# This is the number of seconds the library will wait for GCS operations to
+# complete.
+DEFAULT_HTTP_TIMEOUT_SECONDS = 60
+
+
+def proxy_info_from_environment_var(proxy_env_var):
+  """Reads proxy info from the environment and converts to httplib2.ProxyInfo.
+
+  Args:
+    proxy_env_var: environment variable string to read, http_proxy or
+       https_proxy (in lower case).
+       Example: http://myproxy.domain.com:8080
+
+  Returns:
+    httplib2.ProxyInfo constructed from the environment string.
+  """
+  proxy_url = os.environ.get(proxy_env_var)
+  if not proxy_url:
+    return None
+  proxy_protocol = proxy_env_var.lower().split('_')[0]
+  if not re.match('^https?://', proxy_url, flags=re.IGNORECASE):
+    logging.warn("proxy_info_from_url requires a protocol, which is always "
+                 "http or https.")
+    proxy_url = proxy_protocol + '://' + proxy_url
+  return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol)
+
+
+def get_new_http():
+  """Creates and returns a new httplib2.Http instance.
+
+  Returns:
+    An initialized httplib2.Http instance.
+  """
+  proxy_info = None
+  for proxy_env_var in ['http_proxy', 'https_proxy']:
+    if os.environ.get(proxy_env_var):
+      proxy_info = proxy_info_from_environment_var(proxy_env_var)
+      break
+  # Use a non-infinite SSL timeout to avoid hangs during network flakiness.
+  return httplib2.Http(proxy_info=proxy_info,
+                       timeout=DEFAULT_HTTP_TIMEOUT_SECONDS)
diff --git a/sdks/python/apache_beam/internal/http_client_test.py 
b/sdks/python/apache_beam/internal/http_client_test.py
new file mode 100644
index 000000000000..460b1db1be71
--- /dev/null
+++ b/sdks/python/apache_beam/internal/http_client_test.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the http_client module."""
+from __future__ import absolute_import
+
+import os
+import unittest
+
+from httplib2 import ProxyInfo
+
+from apache_beam.internal.http_client import DEFAULT_HTTP_TIMEOUT_SECONDS
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.internal.http_client import proxy_info_from_environment_var
+
+
+class HttpClientTest(unittest.TestCase):
+
+  def test_proxy_from_env_http_with_port(self):
+    os.environ['http_proxy'] = 'http://localhost:9000'
+    proxy_info = proxy_info_from_environment_var('http_proxy')
+    expected = ProxyInfo(3, 'localhost', 9000)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_https_with_port(self):
+    os.environ['https_proxy'] = 'https://localhost:9000'
+    proxy_info = proxy_info_from_environment_var('https_proxy')
+    expected = ProxyInfo(3, 'localhost', 9000)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_http_without_port(self):
+    os.environ['http_proxy'] = 'http://localhost'
+    proxy_info = proxy_info_from_environment_var('http_proxy')
+    expected = ProxyInfo(3, 'localhost', 80)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_https_without_port(self):
+    os.environ['https_proxy'] = 'https://localhost'
+    proxy_info = proxy_info_from_environment_var('https_proxy')
+    expected = ProxyInfo(3, 'localhost', 443)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_http_without_method(self):
+    os.environ['http_proxy'] = 'localhost:8000'
+    proxy_info = proxy_info_from_environment_var('http_proxy')
+    expected = ProxyInfo(3, 'localhost', 8000)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_https_without_method(self):
+    os.environ['https_proxy'] = 'localhost:8000'
+    proxy_info = proxy_info_from_environment_var('https_proxy')
+    expected = ProxyInfo(3, 'localhost', 8000)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_http_without_port_without_method(self):
+    os.environ['http_proxy'] = 'localhost'
+    proxy_info = proxy_info_from_environment_var('http_proxy')
+    expected = ProxyInfo(3, 'localhost', 80)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_https_without_port_without_method(self):
+    os.environ['https_proxy'] = 'localhost'
+    proxy_info = proxy_info_from_environment_var('https_proxy')
+    expected = ProxyInfo(3, 'localhost', 443)
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_invalid_var(self):
+    proxy_info = proxy_info_from_environment_var('http_proxy_host')
+    expected = None
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_proxy_from_env_wrong_method_in_var_name(self):
+    os.environ['smtp_proxy'] = 'localhost'
+    with self.assertRaises(KeyError):
+      proxy_info_from_environment_var('smtp_proxy')
+
+  def test_proxy_from_env_wrong_method_in_url(self):
+    os.environ['http_proxy'] = 'smtp://localhost:8000'
+    proxy_info = proxy_info_from_environment_var('http_proxy')
+    expected = ProxyInfo(3, 'smtp', 80) # wrong proxy info generated
+    self.assertEquals(str(expected), str(proxy_info))
+
+  def test_get_new_http_proxy_info(self):
+    os.environ['http_proxy'] = 'localhost'
+    http = get_new_http()
+    expected = ProxyInfo(3, 'localhost', 80)
+    self.assertEquals(str(http.proxy_info), str(expected))
+
+  def test_get_new_http_timeout(self):
+    http = get_new_http()
+    self.assertEquals(http.timeout, DEFAULT_HTTP_TIMEOUT_SECONDS)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index f8f9d80ea157..099d7999de3d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -134,6 +134,7 @@
 from apache_beam.internal.gcp import auth
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -791,6 +792,7 @@ class BigQueryWrapper(object):
 
   def __init__(self, client=None):
     self.client = client or bigquery.BigqueryV2(
+        http=get_new_http(),
         credentials=auth.get_service_credentials())
     self._unique_row_id = 0
     # For testing scenarios where we pass in a client we do not want a
@@ -1280,7 +1282,7 @@ class BigQueryWriteFn(DoFn):
   """
 
   def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
-               create_disposition, write_disposition, client):
+               create_disposition, write_disposition, test_client):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -1316,7 +1318,7 @@ def __init__(self, table_id, dataset_id, project_id, 
batch_size, schema,
     self.dataset_id = dataset_id
     self.project_id = project_id
     self.schema = schema
-    self.client = client
+    self.test_client = test_client
     self.create_disposition = create_disposition
     self.write_disposition = write_disposition
     self._rows_buffer = []
@@ -1353,7 +1355,7 @@ def start_bundle(self):
     self._rows_buffer = []
     self.table_schema = self.get_table_schema(self.schema)
 
-    self.bigquery_wrapper = BigQueryWrapper(client=self.client)
+    self.bigquery_wrapper = BigQueryWrapper(client=self.test_client)
     self.bigquery_wrapper.get_or_create_table(
         self.project_id, self.dataset_id, self.table_id, self.table_schema,
         self.create_disposition, self.write_disposition)
@@ -1533,7 +1535,7 @@ def expand(self, pcoll):
         schema=self.get_dict_table_schema(self.schema),
         create_disposition=self.create_disposition,
         write_disposition=self.write_disposition,
-        client=self.test_client)
+        test_client=self.test_client)
     return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
 
   def display_data(self):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index ffd6a6f04748..9bbe6df41e0f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -871,7 +871,7 @@ def test_dofn_client_start_bundle_called(self):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        client=client)
+        test_client=client)
 
     fn.start_bundle()
     self.assertTrue(client.tables.Get.called)
@@ -895,7 +895,7 @@ def test_dofn_client_start_bundle_create_called(self):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        client=client)
+        test_client=client)
 
     fn.start_bundle()
     self.assertTrue(client.tables.Get.called)
@@ -921,7 +921,7 @@ def test_dofn_client_process_performs_batching(self):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        client=client)
+        test_client=client)
 
     fn.start_bundle()
     fn.process({'month': 1})
@@ -950,7 +950,7 @@ def test_dofn_client_process_flush_called(self):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        client=client)
+        test_client=client)
 
     fn.start_bundle()
     fn.process({'month': 1})
@@ -979,7 +979,7 @@ def test_dofn_client_finish_bundle_flush_called(self):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        client=client)
+        test_client=client)
 
     fn.start_bundle()
     fn.process({'month': 1})
@@ -1012,7 +1012,7 @@ def test_dofn_client_no_records(self):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
-        client=client)
+        test_client=client)
 
     fn.start_bundle()
     self.assertTrue(client.tables.Get.called)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 2ca711024945..225d7915eb07 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -26,7 +26,6 @@
 import io
 import logging
 import multiprocessing
-import os
 import re
 import sys
 import threading
@@ -34,8 +33,7 @@
 import traceback
 from builtins import object
 
-import httplib2
-
+from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystemio import Downloader
 from apache_beam.io.filesystemio import DownloaderStream
 from apache_beam.io.filesystemio import PipeStream
@@ -77,10 +75,6 @@
 # +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
-# This is the number of seconds the library will wait for GCS operations to
-# complete.
-DEFAULT_HTTP_TIMEOUT_SECONDS = 60
-
 # This is the number of seconds the library will wait for a partial-file read
 # operation from GCS to complete before retrying.
 DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
@@ -105,44 +99,6 @@
 GCS_BATCH_ENDPOINT = 'https://www.googleapis.com/batch/storage/v1'
 
 
-def proxy_info_from_environment_var(proxy_env_var):
-  """Reads proxy info from the environment and converts to httplib2.ProxyInfo.
-
-  Args:
-    proxy_env_var: environment variable string to read, http_proxy or
-       https_proxy (in lower case).
-       Example: http://myproxy.domain.com:8080
-
-  Returns:
-    httplib2.ProxyInfo constructed from the environment string.
-  """
-  proxy_url = os.environ.get(proxy_env_var)
-  if not proxy_url:
-    return None
-  proxy_protocol = proxy_env_var.lower().split('_')[0]
-  if not re.match('^https?://', proxy_url, flags=re.IGNORECASE):
-    logging.warn("proxy_info_from_url requires a protocol, which is always "
-                 "http or https.")
-    proxy_url = proxy_protocol + '://' + proxy_url
-  return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol)
-
-
-def get_new_http():
-  """Creates and returns a new httplib2.Http instance.
-
-  Returns:
-    An initialized httplib2.Http instance.
-  """
-  proxy_info = None
-  for proxy_env_var in ['http_proxy', 'https_proxy']:
-    if os.environ.get(proxy_env_var):
-      proxy_info = proxy_info_from_environment_var(proxy_env_var)
-      break
-  # Use a non-infinite SSL timeout to avoid hangs during network flakiness.
-  return httplib2.Http(proxy_info=proxy_info,
-                       timeout=DEFAULT_HTTP_TIMEOUT_SECONDS)
-
-
 def parse_gcs_path(gcs_path, object_optional=False):
   """Return the bucket and object names of the given gs:// path."""
   match = re.match('^gs://([^/]+)/(.*)$', gcs_path)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 18bb762e2acd..2420186e7be1 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -21,10 +21,9 @@
 
 from __future__ import absolute_import
 
-from builtins import object
 import codecs
 import getpass
-import httplib2
+import io
 import json
 import logging
 import os
@@ -33,8 +32,8 @@
 import tempfile
 import time
 from datetime import datetime
-import io
 
+from builtins import object
 from past.builtins import unicode
 
 from apitools.base.py import encoding
@@ -43,6 +42,7 @@
 from apache_beam import version as beam_version
 from apache_beam.internal.gcp.auth import get_service_credentials
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import DebugOptions
@@ -54,8 +54,8 @@
 from apache_beam.runners.dataflow.internal.names import PropertyNames
 from apache_beam.runners.internal import names as shared_names
 from apache_beam.runners.portability.stager import Stager
-from apache_beam.transforms import cy_combiners
 from apache_beam.transforms import DataflowDistributionCounter
+from apache_beam.transforms import cy_combiners
 from apache_beam.transforms.display import DisplayData
 from apache_beam.utils import retry
 
@@ -429,8 +429,7 @@ def __init__(self, options):
     else:
       credentials = get_service_credentials()
 
-    # Use 60 second socket timeout avoid hangs during network flakiness.
-    http_client = httplib2.Http(timeout=60)
+    http_client = get_new_http()
     self._client = dataflow.DataflowV1b3(
         url=self.google_cloud_options.dataflow_endpoint,
         credentials=credentials,
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index 1a7a90fcb35a..d4fdfeabf96a 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -56,6 +56,7 @@
 import pkg_resources
 
 from apache_beam.internal import pickler
+from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import WorkerOptions
@@ -286,7 +287,7 @@ def _download_file(from_url, to_path):
         # even for a 404 response (file will contain the contents of the 404
         # response).
         # TODO(angoenka): Extract and use the filename when downloading file.
-        response, content = __import__('httplib2').Http().request(from_url)
+        response, content = get_new_http().request(from_url)
         if int(response['status']) >= 400:
           raise RuntimeError(
               'Artifact not found at %s (response: %s)' % (from_url, response))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174332)
    Time Spent: 2h 50m  (was: 2h 40m)

> normalize httplib2.Http initialization and usage
> ------------------------------------------------
>
>                 Key: BEAM-6033
>                 URL: https://issues.apache.org/jira/browse/BEAM-6033
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Heejong Lee
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Ideally solve both issues below in one PR, but issue 1 has priority as it can 
> halt a pipeline.
> Issue 1:
> Datastore client (and other httplib2-based clients for GCS, Dataflow, 
> BigQuery, etc.) doesn't set a socket timeout.
> This can cause _flush_batch() in datastoreio.py to block forever waiting for 
> a response.
> This issue is very similar to https://issues.apache.org/jira/browse/BEAM-5915 
> and the solution should be similar.
> Issue 2:
> Standardize use of proxy environment settings, as in gcsio:
> https://github.com/apache/beam/blob/8d3389df78aa2e0a0de06b7c5743ca3530dec4ac/sdks/python/apache_beam/io/gcp/gcsio.py#L136
> Issue for proxy settings: https://issues.apache.org/jira/browse/BEAM-3184



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to