Repository: beam Updated Branches: refs/heads/master f55d00253 -> 9b6b9060b
Retry on correct error codes for datastoreio Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8e2522e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8e2522e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8e2522e Branch: refs/heads/master Commit: d8e2522eb04a2a0b5cb28415e55d467d8905d841 Parents: f55d002 Author: Vikas Kedigehalli <[email protected]> Authored: Wed May 3 13:14:20 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Wed May 3 14:32:59 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++++++++----- .../io/gcp/datastore/v1/helper_test.py | 24 +++++++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d8e2522e/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py index d544226..a61884f 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py @@ -24,13 +24,13 @@ try: from google.cloud.proto.datastore.v1 import datastore_pb2 from google.cloud.proto.datastore.v1 import entity_pb2 from google.cloud.proto.datastore.v1 import query_pb2 + from google.rpc import code_pb2 from googledatastore import PropertyFilter, CompositeFilter from googledatastore import helper as datastore_helper from googledatastore.connection import Datastore from googledatastore.connection import RPCError - QUERY_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED except ImportError: - QUERY_NOT_FINISHED = None + pass # pylint: enable=wrong-import-order, wrong-import-position from apache_beam.internal.gcp import auth @@ -129,8 +129,12 @@ def make_partition(project, namespace): def retry_on_rpc_error(exception): """A retry filter for Cloud Datastore RPCErrors.""" if isinstance(exception, RPCError): - return exception.code >= 500 - # TODO(vikasrk): Figure out what other errors should be retried. + err_code = exception.code + # TODO(BEAM-2156): put these codes in a global list and use that instead. + return (err_code == code_pb2.DEADLINE_EXCEEDED or + err_code == code_pb2.UNAVAILABLE or + err_code == code_pb2.UNKNOWN or + err_code == code_pb2.INTERNAL) return False @@ -221,7 +225,6 @@ class QueryIterator(object): Entities are read in batches. Retries on failures. """ - _NOT_FINISHED = QUERY_NOT_FINISHED # Maximum number of results to request per query. _BATCH_SIZE = 500 @@ -265,4 +268,5 @@ class QueryIterator(object): # read). more_results = ((self._limit > 0) and ((num_results == self._BATCH_SIZE) or - (resp.batch.more_results == self._NOT_FINISHED))) + (resp.batch.more_results == + query_pb2.QueryResultBatch.NOT_FINISHED))) http://git-wip-us.apache.org/repos/asf/beam/blob/d8e2522e/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py index 582a5b3..5d4bb6f 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py @@ -33,6 +33,7 @@ try: from google.cloud.proto.datastore.v1 import entity_pb2 from google.cloud.proto.datastore.v1 import query_pb2 from google.cloud.proto.datastore.v1.entity_pb2 import Key + from google.rpc import code_pb2 from googledatastore.connection import RPCError from googledatastore import helper as datastore_helper except ImportError: @@ -49,19 +50,22 @@ class HelperTest(unittest.TestCase): self._query.kind.add().name = 'dummy_kind' patch_retry(self, helper) - def permanent_datastore_failure(self, req): - raise RPCError("dummy", 500, "failed") + def permanent_retriable_datastore_failure(self, req): + raise RPCError("dummy", code_pb2.UNAVAILABLE, "failed") - def transient_datastore_failure(self, req): + def transient_retriable_datastore_failure(self, req): if self._transient_fail_count: self._transient_fail_count -= 1 - raise RPCError("dummy", 500, "failed") + raise RPCError("dummy", code_pb2.INTERNAL, "failed") else: return datastore_pb2.RunQueryResponse() + def non_retriable_datastore_failure(self, req): + raise RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed") + def test_query_iterator(self): self._mock_datastore.run_query.side_effect = ( - self.permanent_datastore_failure) + self.permanent_retriable_datastore_failure) query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) self.assertRaises(RPCError, iter(query_iterator).next) @@ -69,7 +73,7 @@ class HelperTest(unittest.TestCase): def test_query_iterator_with_transient_failures(self): self._mock_datastore.run_query.side_effect = ( - self.transient_datastore_failure) + self.transient_retriable_datastore_failure) query_iterator = helper.QueryIterator("project", None, self._query, self._mock_datastore) fail_count = 2 @@ -80,6 +84,14 @@ class HelperTest(unittest.TestCase): self.assertEqual(fail_count + 1, len(self._mock_datastore.run_query.call_args_list)) + def test_query_iterator_with_non_retriable_failures(self): + self._mock_datastore.run_query.side_effect = ( + self.non_retriable_datastore_failure) + query_iterator = helper.QueryIterator("project", None, self._query, + self._mock_datastore) + self.assertRaises(RPCError, iter(query_iterator).next) + self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list)) + def test_query_iterator_with_single_batch(self): num_entities = 100 batch_size = 500
