Repository: beam
Updated Branches:
  refs/heads/master ae0de1bb5 -> 7e4719cd0


datastoreio: retry on socket errors


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/095e7916
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/095e7916
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/095e7916

Branch: refs/heads/master
Commit: 095e7916d23e49859acb42b9316ddf4222fbc9d9
Parents: ae0de1b
Author: Vikas Kedigehalli <[email protected]>
Authored: Thu Jul 13 10:29:23 2017 -0700
Committer: Ahmet Altay <[email protected]>
Committed: Mon Jul 17 09:16:06 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/io/gcp/datastore/v1/helper.py   |  8 +++++++
 .../io/gcp/datastore/v1/helper_test.py          | 22 ++++++++++++++++----
 2 files changed, 26 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/095e7916/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index f977536..996dace 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -19,6 +19,9 @@
 
 For internal use only; no backwards-compatibility guarantees.
 """
+
+import errno
+from socket import error as SocketError
 import sys
 
 # Protect against environments where datastore library is not available.
@@ -130,6 +133,11 @@ def retry_on_rpc_error(exception):
             err_code == code_pb2.UNAVAILABLE or
             err_code == code_pb2.UNKNOWN or
             err_code == code_pb2.INTERNAL)
+
+  if isinstance(exception, SocketError):
+    return (exception.errno == errno.ECONNRESET or
+            exception.errno == errno.ETIMEDOUT)
+
   return False
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/095e7916/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py 
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index a804c09..a8b1bb1 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -16,6 +16,9 @@
 #
 
 """Tests for datastore helper."""
+import errno
+import random
+from socket import error as SocketError
 import sys
 import unittest
 
@@ -49,6 +52,16 @@ class HelperTest(unittest.TestCase):
     self._query = query_pb2.Query()
     self._query.kind.add().name = 'dummy_kind'
     patch_retry(self, helper)
+    self._retriable_errors = [
+        RPCError("dummy", code_pb2.INTERNAL, "failed"),
+        SocketError(errno.ECONNRESET, "Connection Reset"),
+        SocketError(errno.ETIMEDOUT, "Timed out")
+    ]
+
+    self._non_retriable_errors = [
+        RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed"),
+        SocketError(errno.EADDRNOTAVAIL, "Address not available")
+    ]
 
   def permanent_retriable_datastore_failure(self, req):
     raise RPCError("dummy", code_pb2.UNAVAILABLE, "failed")
@@ -56,12 +69,12 @@ class HelperTest(unittest.TestCase):
   def transient_retriable_datastore_failure(self, req):
     if self._transient_fail_count:
       self._transient_fail_count -= 1
-      raise RPCError("dummy", code_pb2.INTERNAL, "failed")
+      raise random.choice(self._retriable_errors)
     else:
       return datastore_pb2.RunQueryResponse()
 
   def non_retriable_datastore_failure(self, req):
-    raise RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed")
+    raise random.choice(self._non_retriable_errors)
 
   def test_query_iterator(self):
     self._mock_datastore.run_query.side_effect = (
@@ -76,7 +89,7 @@ class HelperTest(unittest.TestCase):
         self.transient_retriable_datastore_failure)
     query_iterator = helper.QueryIterator("project", None, self._query,
                                           self._mock_datastore)
-    fail_count = 2
+    fail_count = 5
     self._transient_fail_count = fail_count
     for _ in query_iterator:
       pass
@@ -89,7 +102,8 @@ class HelperTest(unittest.TestCase):
         self.non_retriable_datastore_failure)
     query_iterator = helper.QueryIterator("project", None, self._query,
                                           self._mock_datastore)
-    self.assertRaises(RPCError, iter(query_iterator).next)
+    self.assertRaises(tuple(map(type, self._non_retriable_errors)),
+                      iter(query_iterator).next)
     self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list))
 
   def test_query_iterator_with_single_batch(self):

Reply via email to