[ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=266366&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-266366
 ]

ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jun/19 06:49
            Start Date: 25/Jun/19 06:49
    Worklog Time Spent: 10m 
      Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r297030035
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -141,3 +148,129 @@ def expand(self, pvalue):
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class BigtableSource(iobase.BoundedSource):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+    """ Constructor of the Read connector of Bigtable
+
+    Args:
+      project_id: [string] GCP Project of to write the Rows
+      instance_id: [string] GCP Instance to write the Rows
+      table_id: [string] GCP Table to write the `DirectRows`
+      filter_: [RowFilter] Filter to apply to columns in a row.
+    """
+    super(self.__class__, self).__init__()
+    self._init({'project_id': project_id,
+                'instance_id': instance_id,
+                'table_id': table_id,
+                'filter_': filter_})
+
+  def __getstate__(self):
+    return self.beam_options
+
+  def __setstate__(self, options):
+    self._init(options)
+
+  def _init(self, options):
+    self.beam_options = options
+    self.table = None
+    self.sample_row_keys = None
+    self.row_count = Metrics.counter(self.__class__.__name__, 'Row count')
+
+  def _get_table(self):
+    if self.table is None:
+      self.table = Client(project=self.beam_options['project_id'])\
+                      .instance(self.beam_options['instance_id'])\
+                      .table(self.beam_options['table_id'])
+    return self.table
+
+  def get_sample_row_keys(self):
+    """ Get a sample of row keys in the table.
+
+    The returned row keys will delimit contiguous sections of the table of
+    approximately equal size, which can be used to break up the data for
+    distributed tasks like mapreduces.
+    :returns: A cancel-able iterator. Can be consumed by calling ``next()``
+                         or by casting to a :class:`list` and can be cancelled 
by
+                         calling ``cancel()``.
+
+    ***** NOTE: For unclear reasons, the function returns generator even
+    after wrapping the result as a list. In order to be used as a list, the
+    result should be wrapped as a list AGAIN! E.g., see 'estimate_size()'
+    """
+    if self.sample_row_keys is None:
+      self.sample_row_keys = list(self._get_table().sample_row_keys())
+    return self.sample_row_keys
+
+  def get_range_tracker(self, start_position=b'', stop_position=b''):
+    if stop_position == b'':
+      return LexicographicKeyRangeTracker(start_position)
+    else:
+      return LexicographicKeyRangeTracker(start_position, stop_position)
+
+  def estimate_size(self):
+    return list(self.get_sample_row_keys())[-1].offset_bytes
+
+  def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
+    """ Splits the source into a set of bundles, using the row_set if it is 
set.
+
+    *** At this point, only splitting an entire table into samples based on 
the sample row keys is supported ***
+
+    :param desired_bundle_size: the desired size (in bytes) of the bundles 
returned.
+    :param start_position: if specified, the position must be used as the 
starting position of the first bundle.
+    :param stop_position: if specified, the position must be used as the 
ending position of the last bundle.
+    Returns:
+       an iterator of objects of type 'SourceBundle' that gives information 
about the generated bundles.
+    """
+
+    if start_position is not None or stop_position is not None:
+      raise NotImplementedError
+
+    # TODO: Use the desired bundle size to split accordingly
 
 Review comment:
   As the BoundedSource [BigtableSource] has been replaced by a 
PTransform/DoFn, this one no longer applies.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 266366)
    Time Spent: 23h 50m  (was: 23h 40m)

> Create a Cloud Bigtable IO connector for Python
> -----------------------------------------------
>
>                 Key: BEAM-3342
>                 URL: https://issues.apache.org/jira/browse/BEAM-3342
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>            Priority: Major
>          Time Spent: 23h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to