Fix a typo in query split error handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6afb906 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6afb906 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6afb906 Branch: refs/heads/python-sdk Commit: d6afb90690f5be2f3cb38d68dc1d49a1b551118e Parents: 1392f70 Author: Vikas Kedigehalli <vika...@google.com> Authored: Wed Dec 7 14:19:26 2016 -0800 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Dec 8 11:22:31 2016 -0800 ---------------------------------------------------------------------- .../apache_beam/io/datastore/v1/datastoreio.py | 2 +- .../io/datastore/v1/datastoreio_test.py | 29 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index fc3e813..a86bb0b 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -181,7 +181,7 @@ class ReadFromDatastore(PTransform): except Exception: logging.warning("Unable to parallelize the given query: %s", query, exc_info=True) - query_splits = [(key, query)] + query_splits = [query] sharded_query_splits = [] for split_query in query_splits: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py index 2ac7ffb..f80a320 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py @@ -122,6 +122,35 @@ class DatastoreioTest(unittest.TestCase): self.assertEqual(1, len(returned_split_queries)) self.assertEqual(0, len(self._mock_datastore.method_calls)) + def test_SplitQueryFn_with_exception(self): + """A test that verifies that no split is performed when failures occur.""" + with patch.object(helper, 'get_datastore', + return_value=self._mock_datastore): + # Force SplitQueryFn to compute the number of query splits + num_splits = 0 + expected_num_splits = 1 + entity_bytes = (expected_num_splits * + ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES) + with patch.object(ReadFromDatastore, 'get_estimated_size_bytes', + return_value=entity_bytes): + + with patch.object(query_splitter, 'get_splits', + side_effect=ValueError("Testing query split error")): + split_query_fn = ReadFromDatastore.SplitQueryFn( + self._PROJECT, self._query, None, num_splits) + mock_context = MagicMock() + mock_context.element = self._query + split_query_fn.start_bundle(mock_context) + returned_split_queries = [] + for split_query in split_query_fn.process(mock_context): + returned_split_queries.append(split_query) + + self.assertEqual(len(returned_split_queries), expected_num_splits) + self.assertEqual(returned_split_queries[0][1], self._query) + self.assertEqual(0, + len(self._mock_datastore.run_query.call_args_list)) + self.verify_unique_keys(returned_split_queries) + def test_DatastoreWriteFn_with_emtpy_batch(self): self.check_DatastoreWriteFn(0)