Repository: beam Updated Branches: refs/heads/master 9da46fd05 -> 597b07e0d
Comply with byte limit for Datastore Commit. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c0f599d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c0f599d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c0f599d Branch: refs/heads/master Commit: 3c0f599d64a7f57608f1c18b05f2ab036a8b02fc Parents: 9da46fd Author: Colin Phipps <[email protected]> Authored: Wed May 10 09:50:56 2017 +0000 Committer: Ahmet Altay <[email protected]> Committed: Tue May 23 08:39:09 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/io/gcp/datastore/v1/datastoreio.py | 15 ++++++++++++++- .../io/gcp/datastore/v1/datastoreio_test.py | 17 +++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3c0f599d/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py index c606133..89c2a93 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py @@ -313,8 +313,12 @@ class _Mutate(PTransform): supported, as the commits are retried when failures occur. """ - # Max allowed Datastore write batch size. + # Max allowed Datastore writes per batch, and max bytes per batch. + # Note that the max bytes per batch set here is lower than the 10MB limit + # actually enforced by the API, to leave space for the CommitRequest wrapper + # around the mutations. _WRITE_BATCH_SIZE = 500 + _WRITE_BATCH_BYTES_SIZE = 9000000 def __init__(self, project, mutation_fn): """Initializes a Mutate transform. @@ -353,13 +357,20 @@ class _Mutate(PTransform): self._project = project self._datastore = None self._mutations = [] + self._mutations_size = 0 # Total size of entries in _mutations. def start_bundle(self): self._mutations = [] + self._mutations_size = 0 self._datastore = helper.get_datastore(self._project) def process(self, element): + size = element.ByteSize() + if (self._mutations and + size + self._mutations_size > _Mutate._WRITE_BATCH_BYTES_SIZE): + self._flush_batch() self._mutations.append(element) + self._mutations_size += size if len(self._mutations) >= _Mutate._WRITE_BATCH_SIZE: self._flush_batch() @@ -367,12 +378,14 @@ class _Mutate(PTransform): if self._mutations: self._flush_batch() self._mutations = [] + self._mutations_size = 0 def _flush_batch(self): # Flush the current batch of mutations to Cloud Datastore. helper.write_mutations(self._datastore, self._project, self._mutations) logging.debug("Successfully wrote %d mutations.", len(self._mutations)) self._mutations = [] + self._mutations_size = 0 class WriteToDatastore(_Mutate): http://git-wip-us.apache.org/repos/asf/beam/blob/3c0f599d/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py index 6adc08a..424e714 100644 --- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py +++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py @@ -15,6 +15,7 @@ # limitations under the License. # +import math import unittest from mock import MagicMock, call, patch @@ -191,6 +192,22 @@ class DatastoreioTest(unittest.TestCase): self.assertEqual((num_entities - 1) / _Mutate._WRITE_BATCH_SIZE + 1, self._mock_datastore.commit.call_count) + def test_DatastoreWriteLargeEntities(self): + """100*100kB entities gets split over two Commit RPCs.""" + with patch.object(helper, 'get_datastore', + return_value=self._mock_datastore): + entities = [e.entity for e in fake_datastore.create_entities(100)] + + datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT) + datastore_write_fn.start_bundle() + for entity in entities: + datastore_helper.add_properties( + entity, {'large': u'A' * 100000}, exclude_from_indexes=True) + datastore_write_fn.process(WriteToDatastore.to_upsert_mutation(entity)) + datastore_write_fn.finish_bundle() + + self.assertEqual(2, self._mock_datastore.commit.call_count) + def verify_unique_keys(self, queries): """A helper function that verifies if all the queries have unique keys.""" keys, _ = zip(*queries)
