[
https://issues.apache.org/jira/browse/BEAM-4543?focusedWorklogId=230334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-230334
]
ASF GitHub Bot logged work on BEAM-4543:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Apr/19 00:00
Start Date: 20/Apr/19 00:00
Worklog Time Spent: 10m
Work Description: chamikaramj commented on pull request #8262:
[BEAM-4543] Python Datastore IO using google-cloud-datastore
URL: https://github.com/apache/beam/pull/8262#discussion_r277060875
##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/helper.py
##########
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Cloud Datastore client and test functions.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
+from __future__ import absolute_import
+
+import logging
+import time
+import uuid
+from builtins import range
+
+from google.api_core import exceptions
+from google.cloud.datastore import client
+
+from apache_beam.io.gcp.datastore.v1new import types
+from apache_beam.utils import retry
+from cachetools.func import ttl_cache
+
+# https://cloud.google.com/datastore/docs/concepts/errors#error_codes
+_RETRYABLE_DATASTORE_ERRORS = (
+ exceptions.Aborted,
+ exceptions.DeadlineExceeded,
+ exceptions.InternalServerError,
+ exceptions.ServiceUnavailable,
+)
+
+
+@ttl_cache(maxsize=128, ttl=3600)
+def get_client(project, namespace):
+ """Returns a Cloud Datastore client."""
+ _client = client.Client(project=project, namespace=namespace)
+ _client.base_url = 'https://batch-datastore.googleapis.com' # BEAM-1387
+ return _client
+
+
+def retry_on_rpc_error(exception):
+ """A retry filter for Cloud Datastore RPCErrors."""
+ return isinstance(exception, _RETRYABLE_DATASTORE_ERRORS)
+
+
[email protected]_exponential_backoff(num_retries=5,
+ retry_filter=retry_on_rpc_error)
+def write_mutations(batch, throttler, rpc_stats_callback, throttle_delay=1):
+ """A helper function to write a batch of mutations to Cloud Datastore.
+
+ If a commit fails, it will be retried up to 5 times. All mutations in the
+ batch will be committed again, even if the commit was partially successful.
+ If the retry limit is exceeded, the last exception from Cloud Datastore will
+ be raised.
+
+ Assumes that the Datastore client library does not perform any retries on
+ commits. It has not been determined how such retries would interact with the
+ retries and throttler used here.
+ See ``google.cloud.datastore_v1.gapic.datastore_client_config`` for
+ retry config.
+
+ Args:
+ batch: (:class:`~google.cloud.datastore.batch.Batch`) An instance of an
+ in-progress batch.
+ rpc_stats_callback: a function to call with arguments `successes` and
+ `failures` and `throttled_secs`; this is called to record successful
+ and failed RPCs to Datastore and time spent waiting for throttling.
Review comment:
For test utils, please make sure either to use the same classes on v1/ or
move code (as much as possible) to helper/util classes to minimize code copying.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 230334)
Time Spent: 1h 20m (was: 1h 10m)
> Remove dependency on googledatastore in favor of google-cloud-datastore.
> ------------------------------------------------------------------------
>
> Key: BEAM-4543
> URL: https://issues.apache.org/jira/browse/BEAM-4543
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Valentyn Tymofieiev
> Assignee: Udi Meiri
> Priority: Minor
> Labels: triaged
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> apache-beam[gcp] package depends [1] on googledatastore package [2]. We
> should replace this dependency with google-cloud-datastore [3] which is
> officially supported, has better release cadence and also has Python 3
> support.
> [1]
> https://github.com/apache/beam/blob/fad655462f8fadfdfaab0b7a09cab538f076f94e/sdks/python/setup.py#L126
> [2] [https://pypi.org/project/googledatastore/]
> [3] [https://pypi.org/project/google-cloud-datastore/]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)