[ 
https://issues.apache.org/jira/browse/BEAM-4543?focusedWorklogId=230336&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-230336
 ]

ASF GitHub Bot logged work on BEAM-4543:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/19 00:00
            Start Date: 20/Apr/19 00:00
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #8262: 
[BEAM-4543] Python Datastore IO using google-cloud-datastore
URL: https://github.com/apache/beam/pull/8262#discussion_r276876716
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
 ##########
 @@ -0,0 +1,477 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A connector for reading from and writing to Google Cloud Datastore.
+
+Uses the newer google-cloud-datastore pip dependency.
+
+For Datastore entities, does not support the property value "meaning" field.
+
+This module is experimental, no backwards compatibility guarantees.
+"""
+from __future__ import absolute_import
+from __future__ import division
+
+import logging
+import time
+from builtins import object
+from builtins import round
+
+from apache_beam import typehints
+from apache_beam.io.gcp.datastore.v1 import util
+from apache_beam.io.gcp.datastore.v1.adaptive_throttler import 
AdaptiveThrottler
+from apache_beam.io.gcp.datastore.v1new import helper
+from apache_beam.io.gcp.datastore.v1new import query_splitter
+from apache_beam.io.gcp.datastore.v1new import types
+from apache_beam.metrics.metric import Metrics
+from apache_beam.transforms import Create
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import ParDo
+from apache_beam.transforms import PTransform
+from apache_beam.transforms import Reshuffle
+
+__all__ = ['QueryDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
+
+
+@typehints.with_output_types(types.Entity)
+class QueryDatastore(PTransform):
+  """A ``PTransform`` for querying Google Cloud Datastore.
+
+  To read a ``PCollection[Entity]`` from a Cloud Datastore ``Query``, use
+  ``QueryDatastore`` transform by providing a `query` to
+  read from. The project and optional namespace are set in the query.
+  The query will be split into multiple queries to allow for parallelism. The
+  degree of parallelism is automatically determined, but can be overridden by
+  setting `num_splits` to a value of 1 or greater.
+
+  Note: Normally, a runner will read from Cloud Datastore in parallel across
+  many workers. However, when the `query` is configured with a `limit` or if 
the
+  query contains inequality filters like `GREATER_THAN, LESS_THAN` etc., then
+  all the returned results will be read by a single worker in order to ensure
+  correct data. Since data is read from a single worker, this could have
+  significant impact on the performance of the job.
+
+  The semantics for query splitting is defined below:
+    1. If `num_splits` is equal to 0, then the number of splits will be chosen
+    dynamically at runtime based on the query data size.
+
+    2. Any value of `num_splits` greater than
+    `QueryDatastore._NUM_QUERY_SPLITS_MAX` will be capped at that value.
+
+    3. If the `query` has a user limit set, or contains inequality filters, 
then
+    `num_splits` will be ignored and no split will be performed.
+
+    4. Under certain cases Cloud Datastore is unable to split query to the
+    requested number of splits. In such cases we just use whatever Cloud
+    Datastore returns.
+
+  See https://developers.google.com/datastore/ for more details on Google Cloud
+  Datastore.
+  """
+
+  # An upper bound on the number of splits for a query.
+  _NUM_QUERY_SPLITS_MAX = 50000
+  # A lower bound on the number of splits for a query. This is to ensure that
+  # we parallelize the query even when Datastore statistics are not available.
+  _NUM_QUERY_SPLITS_MIN = 12
+  # Default bundle size of 64MB.
+  _DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024
+
+  def __init__(self, query, num_splits=0):
+    """Initialize the `QueryDatastore` transform.
+
+    This transform outputs elements of type
+    :class:`~apache_beam.io.gcp.datastore.v1new.types.Entity`.
+
+    Args:
+      query: (:class:`~apache_beam.io.gcp.datastore.v1new.types.Query`) query
+        used to fetch entities.
+      num_splits: (:class:`int`) (optional) Number of splits for the query.
+    """
+    super(QueryDatastore, self).__init__()
+
+    if not query.project:
+      raise ValueError("query.project cannot be empty")
+    if not query:
+      raise ValueError("query cannot be empty")
+    if num_splits < 0:
+      raise ValueError("num_splits must be greater than or equal 0")
+
+    self._project = query.project
+    # using _namespace conflicts with DisplayData._namespace
+    self._datastore_namespace = query.namespace
+    self._query = query
+    self._num_splits = num_splits
+
+  def expand(self, pcoll):
+    # This is a composite transform involves the following:
+    #   1. Create a singleton of the user provided `query` and apply a 
``ParDo``
+    #   that splits the query into `num_splits` queries if possible.
+    #
+    #   If the value of `num_splits` is 0, the number of splits will be
+    #   computed dynamically based on the size of the data for the `query`.
+    #
+    #   2. The resulting ``PCollection`` is sharded across workers using a
+    #   ``Reshuffle`` operation.
+    #
+    #   3. In the third step, a ``ParDo`` reads entities for each query and
+    #   outputs a ``PCollection[Entity]``.
+
+    return (pcoll.pipeline
+            | 'UserQuery' >> Create([self._query])
+            | 'SplitQuery' >> ParDo(QueryDatastore._SplitQueryFn(
+                self._num_splits))
+            | Reshuffle()
+            | 'Read' >> ParDo(QueryDatastore._QueryFn()))
+
+  def display_data(self):
+    disp_data = {'project': self._query.project,
+                 'query': str(self._query),
+                 'num_splits': self._num_splits}
+
+    if self._datastore_namespace is not None:
+      disp_data['namespace'] = self._datastore_namespace
+
+    return disp_data
+
+  @typehints.with_input_types(types.Query)
+  @typehints.with_output_types(types.Query)
+  class _SplitQueryFn(DoFn):
+    """A `DoFn` that splits a given query into multiple sub-queries."""
+    def __init__(self, num_splits):
+      super(QueryDatastore._SplitQueryFn, self).__init__()
+      self._num_splits = num_splits
+
+    def process(self, query, *args, **kwargs):
+      client = helper.get_client(query.project, query.namespace)
+      try:
+        # Short circuit estimating num_splits if split is not possible.
+        query_splitter.validate_split(query)
+
+        if self._num_splits == 0:
+          estimated_num_splits = self.get_estimated_num_splits(client, query)
+        else:
+          estimated_num_splits = self._num_splits
+
+        logging.info("Splitting the query into %d splits", 
estimated_num_splits)
+        query_splits = query_splitter.get_splits(
+            client, query, estimated_num_splits)
+      except query_splitter.QuerySplitterError:
+        logging.info("Unable to parallelize the given query: %s", query,
+                     exc_info=True)
+        query_splits = [query]
+
+      return query_splits
+
+    def display_data(self):
+      disp_data = {'num_splits': self._num_splits}
+      return disp_data
+
+    @staticmethod
+    def query_latest_statistics_timestamp(client):
+      """Fetches the latest timestamp of statistics from Cloud Datastore.
+
+      Cloud Datastore system tables with statistics are periodically updated.
+      This method fetches the latest timestamp (in microseconds) of statistics
+      update using the `__Stat_Total__` table.
+      """
+      if client.namespace is None:
+        kind = '__Stat_Total__'
+      else:
+        kind = '__Stat_Ns_Total__'
+      query = client.query(kind=kind, order=["-timestamp", ])
+      entities = list(query.fetch(limit=1))
+      if not entities:
+        raise RuntimeError("Datastore total statistics unavailable.")
+      return entities[0]['timestamp']
+
+    @staticmethod
+    def get_estimated_size_bytes(client, query):
+      """Get the estimated size of the data returned by this instance's query.
+
+      Cloud Datastore provides no way to get a good estimate of how large the
+      result of a query is going to be. Hence we use the __Stat_Kind__ system
+      table to get size of the entire kind as an approximate estimate, assuming
+      exactly 1 kind is specified in the query.
+      See https://cloud.google.com/datastore/docs/concepts/stats.
+      """
+      kind_name = query.kind
+      latest_timestamp = (
+          QueryDatastore._SplitQueryFn
+          .query_latest_statistics_timestamp(client))
+      logging.info('Latest stats timestamp for kind %s is %s',
+                   kind_name, latest_timestamp)
+
+      if client.namespace is None:
+        kind = '__Stat_Kind__'
+      else:
+        kind = '__Stat_Ns_Kind__'
+      query = client.query(kind=kind)
+      query.add_filter('kind_name', '=', kind_name)
+      query.add_filter('timestamp', '=', latest_timestamp)
+
+      entities = list(query.fetch(limit=1))
+      if not entities:
+        raise RuntimeError(
+            'Datastore statistics for kind %s unavailable' % kind_name)
+      return entities[0]['entity_bytes']
+
+    @staticmethod
+    def get_estimated_num_splits(client, query):
+      """Computes the number of splits to be performed on the query."""
+      try:
+        estimated_size_bytes = (
+            QueryDatastore._SplitQueryFn
+            .get_estimated_size_bytes(client, query))
+        logging.info('Estimated size bytes for query: %s', 
estimated_size_bytes)
+        num_splits = int(min(QueryDatastore._NUM_QUERY_SPLITS_MAX, round(
+            (float(estimated_size_bytes) /
+             QueryDatastore._DEFAULT_BUNDLE_SIZE_BYTES))))
+      except Exception as e:
+        logging.warning('Failed to fetch estimated size bytes: %s', e)
+        # Fallback in case estimated size is unavailable.
+        num_splits = QueryDatastore._NUM_QUERY_SPLITS_MIN
+
+      return max(num_splits, QueryDatastore._NUM_QUERY_SPLITS_MIN)
+
+  @typehints.with_input_types(types.Query)
+  @typehints.with_output_types(types.Entity)
+  class _QueryFn(DoFn):
+    """A DoFn that fetches entities from Cloud Datastore, for a given query."""
+    def process(self, query, *unused_args, **unused_kwargs):
+      _client = helper.get_client(query.project, query.namespace)
+      client_query = query._to_client_query(_client)
 
 Review comment:
   It's strange that we invoke a a private method for this crucial API call. 
Should this method "_to_client_query()" be updated to be "to_client_query)" (or 
is there an alternate public API) ?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 230336)
    Time Spent: 1h 40m  (was: 1.5h)

> Remove dependency on googledatastore in favor of google-cloud-datastore.
> ------------------------------------------------------------------------
>
>                 Key: BEAM-4543
>                 URL: https://issues.apache.org/jira/browse/BEAM-4543
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Udi Meiri
>            Priority: Minor
>              Labels: triaged
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> apache-beam[gcp] package depends [1] on googledatastore package [2]. We 
> should replace this dependency with google-cloud-datastore [3] which is 
> officially supported, has better release cadence and also has Python 3 
> support.
> [1] 
> https://github.com/apache/beam/blob/fad655462f8fadfdfaab0b7a09cab538f076f94e/sdks/python/setup.py#L126
> [2] [https://pypi.org/project/googledatastore/]
> [3] [https://pypi.org/project/google-cloud-datastore/]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to