Repository: beam Updated Branches: refs/heads/master ae0de1bb5 -> 7e4719cd0
datastoreio: retry on socket errors Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/095e7916 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/095e7916 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/095e7916 Branch: refs/heads/master Commit: 095e7916d23e49859acb42b9316ddf4222fbc9d9 Parents: ae0de1b Author: Vikas Kedigehalli <[email protected]> Authored: Thu Jul 13 10:29:23 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Mon Jul 17 09:16:06 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/io/gcp/datastore/v1/helper.py | 8 +++++++ .../io/gcp/datastore/v1/helper_test.py | 22 ++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/095e7916/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index f977536..996dace 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -19,6 +19,9 @@ For internal use only; no backwards-compatibility guarantees. """ + +import errno +from socket import error as SocketError import sys # Protect against environments where datastore library is not available. @@ -130,6 +133,11 @@ def retry_on_rpc_error(exception): err_code == code_pb2.UNAVAILABLE or err_code == code_pb2.UNKNOWN or err_code == code_pb2.INTERNAL) + + if isinstance(exception, SocketError): + return (exception.errno == errno.ECONNRESET or + exception.errno == errno.ETIMEDOUT) + return False http://git-wip-us.apache.org/repos/asf/beam/blob/095e7916/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py index a804c09..a8b1bb1 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py @@ -16,6 +16,9 @@ # """Tests for datastore helper.""" +import errno +import random +from socket import error as SocketError import sys import unittest @@ -49,6 +52,16 @@ class HelperTest(unittest.TestCase): self._query = query_pb2.Query() self._query.kind.add().name = 'dummy_kind' patch_retry(self, helper) + self._retriable_errors = [ + RPCError("dummy", code_pb2.INTERNAL, "failed"), + SocketError(errno.ECONNRESET, "Connection Reset"), + SocketError(errno.ETIMEDOUT, "Timed out") + ] + + self._non_retriable_errors = [ + RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed"), + SocketError(errno.EADDRNOTAVAIL, "Address not available") + ] def permanent_retriable_datastore_failure(self, req): raise RPCError("dummy", code_pb2.UNAVAILABLE, "failed") @@ -56,12 +69,12 @@ class HelperTest(unittest.TestCase): def transient_retriable_datastore_failure(self, req): if self._transient_fail_count: self._transient_fail_count -= 1 - raise RPCError("dummy", code_pb2.INTERNAL, "failed") + raise random.choice(self._retriable_errors) else: return datastore_pb2.RunQueryResponse() def non_retriable_datastore_failure(self, req): - raise RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed") + raise random.choice(self._non_retriable_errors) def test_query_iterator(self): self._mock_datastore.run_query.side_effect = ( @@ -76,7 +89,7 @@ class HelperTest(unittest.TestCase): self.transient_retriable_datastore_failure) query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) - fail_count = 2 + fail_count = 5 self._transient_fail_count = fail_count for _ in query_iterator: pass @@ -89,7 +102,8 @@ class HelperTest(unittest.TestCase): self.non_retriable_datastore_failure) query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) - self.assertRaises(RPCError, iter(query_iterator).next) + self.assertRaises(tuple(map(type, self._non_retriable_errors)), + iter(query_iterator).next) self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list)) def test_query_iterator_with_single_batch(self):
