[ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=213901&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-213901
 ]

ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Mar/19 17:35
            Start Date: 15/Mar/19 17:35
    Worklog Time Spent: 10m 
      Work Description: sduskis commented on pull request #7737: [BEAM-3342] 
Create a Cloud Bigtable Python connector Read
URL: https://github.com/apache/beam/pull/7737#discussion_r266079985
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -141,3 +147,273 @@ def expand(self, pvalue):
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class _BigTableSource(iobase.BoundedSource):
+  """ Creates the connector to get the rows in Bigtable and using each
+  row in beam pipe line
+  Args:
+    project_id(str): GCP Project ID
+    instance_id(str): GCP Instance ID
+    table_id(str): GCP Table ID
+    row_set(RowSet): Creating a set of row keys and row ranges.
+    filter_(RowFilter): Filter to apply to cells in a row.
+
+  """
+  def __init__(self, project_id, instance_id, table_id,
+               row_set=None, filter_=None):
+    """ Constructor of the Read connector of Bigtable
+    Args:
+      project_id(str): GCP Project of to write the Rows
+      instance_id(str): GCP Instance to write the Rows
+      table_id(str): GCP Table to write the `DirectRows`
+      row_set(RowSet): This variable represents the RowRanges
+      you want to use, It used on the split, to set the split
+      only in that ranges.
+      filter_(RowFilter): Get some expected rows, bases on
+      certainly information in the row.
+    """
+    super(self.__class__, self).__init__()
+    self.beam_options = {'project_id': project_id,
+                         'instance_id': instance_id,
+                         'table_id': table_id,
+                         'row_set': row_set,
+                         'filter_': filter_}
+    self.table = None
+    self.read_row = Metrics.counter(self.__class__.__name__, 'read_row')
+
+  def __getstate__(self):
+    return self.beam_options
+
+  def __setstate__(self, options):
+    self.beam_options = {'project_id': options['project_id'],
+                         'instance_id': options['instance_id'],
+                         'table_id': options['table_id'],
+                         'row_set': options['row_set'],
+                         'filter_': options['filter_']}
+
+  def _getTable(self):
+    if self.table is None:
+      client = Client(project=self.beam_options['project_id'])
+      instance = client.instance(self.beam_options['instance_id'])
+      self.table = instance.table(self.beam_options['table_id'])
+    return self.table
+
+  def estimate_size(self):
+    size = [k.offset_bytes for k in self.get_sample_row_keys()][-1]
+    return size
+
+  def get_sample_row_keys(self):
+    ''' Get a sample of row keys in the table.
+
+    The returned row keys will delimit contiguous sections of the table of
+    approximately equal size, which can be used to break up the data for
+    distributed tasks like mapreduces.
+
+    :returns: A cancel-able iterator. Can be consumed by calling ``next()``
+                  or by casting to a :class:`list` and can be cancelled by
+                  calling ``cancel()``.
+    '''
+    return self._getTable().sample_row_keys()
+
+  def get_range_tracker(self, start_position, stop_position):
+    return LexicographicKeyRangeTracker(start_position, stop_position)
+
+  def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
+    ''' Splits the source into a set of bundles, using the row_set if it is
+    neccessary.
+    Bundles should be approximately of ``desired_bundle_size`` bytes, if this
+    bundle its bigger, it use the ``range_split_fraction`` to split the bundles
+    in fractions.
+
+    :param desired_bundle_size: the desired size (in bytes) of the bundles
+    returned.
+    :param start_position: if specified the given position must be used as
+    the starting position of the first bundle.
+    :param stop_position: if specified the given position must be used as
+    the ending position of the last bundle.
+
+    Returns:
+      an iterator of objects of type 'SourceBundle' that gives information
+      about the generated bundles.
+    '''
+
+    # The row_set is variable to get only certain ranges of rows, this
+    # variable is set in the constructor of this class.
+    if self.beam_options['row_set'] is not None:
+      for sample_row_key in self.beam_options['row_set'].row_ranges:
 
 Review comment:
   Please rename `sample_row_key` to `row_range`
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 213901)
    Time Spent: 18h 50m  (was: 18h 40m)

> Create a Cloud Bigtable Python connector
> ----------------------------------------
>
>                 Key: BEAM-3342
>                 URL: https://issues.apache.org/jira/browse/BEAM-3342
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>            Priority: Major
>              Labels: triaged
>          Time Spent: 18h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to