This is an automated email from the ASF dual-hosted git repository.
yifanzou pushed a commit to branch release-2.15.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.15.0 by this push:
new 4b30553 [BEAM-7860] Python Datastore: fix key sort order
4b30553 is described below
commit 4b30553da2db7bc1559bfeee6dd7059c9591e404
Author: Udi Meiri <[email protected]>
AuthorDate: Fri Aug 2 19:04:53 2019 -0700
[BEAM-7860] Python Datastore: fix key sort order
This is a regression from the v1 client.
---
.../io/gcp/datastore/v1/query_splitter_test.py | 11 +++--
.../io/gcp/datastore/v1new/query_splitter.py | 56 +++++++++++++++++++++-
.../io/gcp/datastore/v1new/query_splitter_test.py | 51 ++++++++++++++++++--
3 files changed, 110 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
index c327e6a..80c66ae 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
@@ -171,11 +171,16 @@ class QuerySplitterTest(unittest.TestCase):
batch_size: the number of entities returned by fake datastore in one req.
"""
- # Test for both random long ids and string ids.
- id_or_name = [True, False]
+ # Test for random long ids, string ids, and a mix of both.
+ id_or_name = [True, False, None]
for id_type in id_or_name:
- entities = fake_datastore.create_entities(num_entities, id_type)
+ if id_type is None:
+ entities = fake_datastore.create_entities(num_entities, False)
+ entities.extend(fake_datastore.create_entities(num_entities, True))
+ num_entities *= 2
+ else:
+ entities = fake_datastore.create_entities(num_entities, id_type)
mock_datastore = MagicMock()
# Assign a fake run_query method as a side_effect to the mock.
mock_datastore.run_query.side_effect = \
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
index 4f8be83..82a109f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter.py
@@ -26,6 +26,9 @@ from __future__ import division
from builtins import range
from builtins import round
+from past.builtins import long
+from past.builtins import unicode
+
from apache_beam.io.gcp.datastore.v1new import types
__all__ = ['QuerySplitterError', 'SplitNotPossibleError', 'get_splits']
@@ -123,10 +126,59 @@ def _create_scatter_query(query, num_splits):
return scatter_query
+class IdOrName(object):
+ """Represents an ID or name of a Datastore key,
+
+ Implements sort ordering: by ID, then by name, keys with IDs before those
+ with names.
+ """
+ def __init__(self, id_or_name):
+ self.id_or_name = id_or_name
+ if isinstance(id_or_name, (str, unicode)):
+ self.id = None
+ self.name = id_or_name
+ elif isinstance(id_or_name, (int, long)):
+ self.id = id_or_name
+ self.name = None
+ else:
+ raise TypeError('Unexpected type of id_or_name: %s' % id_or_name)
+
+ def __lt__(self, other):
+ if not isinstance(other, IdOrName):
+ return super(IdOrName, self).__lt__(other)
+
+ if self.id is not None:
+ if other.id is None:
+ return True
+ else:
+ return self.id < other.id
+
+ if other.id is not None:
+ return False
+
+ return self.name < other.name
+
+ def __eq__(self, other):
+ if not isinstance(other, IdOrName):
+ return super(IdOrName, self).__eq__(other)
+ return self.id == other.id and self.name == other.name
+
+ def __hash__(self):
+ return hash((self.id, self.other))
+
+
def client_key_sort_key(client_key):
"""Key function for sorting lists of ``google.cloud.datastore.key.Key``."""
- return [client_key.project, client_key.namespace or ''] + [
- str(element) for element in client_key.flat_path]
+ sort_key = [client_key.project, client_key.namespace or '']
+ # A key path is made up of (kind, id_or_name) pairs. The last pair might be
+ # missing an id_or_name.
+ flat_path = list(client_key.flat_path)
+ while flat_path:
+ sort_key.append(flat_path.pop(0)) # kind
+ if flat_path:
+ sort_key.append(IdOrName(flat_path.pop(0)))
+
+ return sort_key
def _get_scatter_keys(client, query, num_splits):
diff --git
a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
index 3e30859..7f3d1ed 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/query_splitter_test.py
@@ -103,9 +103,16 @@ class QuerySplitterTest(QuerySplitterTestBase):
unused_batch_size: ignored in v1new since query results are entirely
handled by the Datastore client.
"""
- # Test for both random long ids and string ids.
- for id_or_name in [True, False]:
- client_entities = helper.create_client_entities(num_entities, id_or_name)
+ # Test for random long ids, string ids, and a mix of both.
+ for id_or_name in [True, False, None]:
+ if id_or_name is None:
+ client_entities = helper.create_client_entities(num_entities, False)
+ client_entities.extend(helper.create_client_entities(num_entities,
+ True))
+ num_entities *= 2
+ else:
+ client_entities = helper.create_client_entities(num_entities,
+ id_or_name)
mock_client = mock.MagicMock()
mock_client_query = mock.MagicMock()
@@ -154,6 +161,19 @@ class QuerySplitterTest(QuerySplitterTestBase):
if lt_key is None:
last_query_seen = True
+ def test_id_or_name(self):
+ id_ = query_splitter.IdOrName(1)
+ self.assertEqual(1, id_.id)
+ self.assertIsNone(id_.name)
+ name = query_splitter.IdOrName('1')
+ self.assertIsNone(name.id)
+ self.assertEqual('1', name.name)
+ self.assertEqual(query_splitter.IdOrName(1), query_splitter.IdOrName(1))
+ self.assertEqual(query_splitter.IdOrName('1'),
query_splitter.IdOrName('1'))
+ self.assertLess(query_splitter.IdOrName(2), query_splitter.IdOrName('1'))
+ self.assertLess(query_splitter.IdOrName(1), query_splitter.IdOrName(2))
+ self.assertLess(query_splitter.IdOrName('1'), query_splitter.IdOrName('2'))
+
def test_client_key_sort_key(self):
k = key.Key('kind1', 1, project=self._PROJECT, namespace=self._NAMESPACE)
k2 = key.Key('kind2', 'a', parent=k)
@@ -165,6 +185,31 @@ class QuerySplitterTest(QuerySplitterTestBase):
keys.sort(key=query_splitter.client_key_sort_key)
self.assertEqual(expected_sort, keys)
+ def test_client_key_sort_key_ids(self):
+ k1 = key.Key('kind', 2, project=self._PROJECT)
+ k2 = key.Key('kind', 1, project=self._PROJECT)
+ keys = [k1, k2]
+ expected_sort = [k2, k1]
+ keys.sort(key=query_splitter.client_key_sort_key)
+ self.assertEqual(expected_sort, keys)
+
+ def test_client_key_sort_key_names(self):
+ k1 = key.Key('kind', '2', project=self._PROJECT)
+ k2 = key.Key('kind', '1', project=self._PROJECT)
+ keys = [k1, k2]
+ expected_sort = [k2, k1]
+ keys.sort(key=query_splitter.client_key_sort_key)
+ self.assertEqual(expected_sort, keys)
+
+ def test_client_key_sort_key_ids_vs_names(self):
+ # Keys with IDs always come before keys with names.
+ k1 = key.Key('kind', '1', project=self._PROJECT)
+ k2 = key.Key('kind', 2, project=self._PROJECT)
+ keys = [k1, k2]
+ expected_sort = [k2, k1]
+ keys.sort(key=query_splitter.client_key_sort_key)
+ self.assertEqual(expected_sort, keys)
+
# Hide base class from collection by nose.
del QuerySplitterTestBase