[ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=199481&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-199481
 ]

ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Feb/19 22:16
            Start Date: 15/Feb/19 22:16
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #7737: 
[BEAM-3342] Create a Cloud Bigtable Python connector Read
URL: https://github.com/apache/beam/pull/7737#discussion_r257403607
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -141,3 +147,197 @@ def expand(self, pvalue):
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class _BigTableReadFn(iobase.BoundedSource):
+  """ Creates the connector to get the rows in Bigtable and using each
+  row in beam pipe line
+  Args:
+    project_id(str): GCP Project ID
+    instance_id(str): GCP Instance ID
+    table_id(str): GCP Table ID
+    row_set(RowSet): Creating a set of row keys and row ranges.
+    filter_(RowFilter): Filter to apply to cells in a row.
+
+  """
+  def __init__(self, project_id, instance_id, table_id,
+               row_set=None, filter_=None):
+    """ Constructor of the Read connector of Bigtable
+    Args:
+      project_id(str): GCP Project of to write the Rows
+      instance_id(str): GCP Instance to write the Rows
+      table_id(str): GCP Table to write the `DirectRows`
+    """
+    super(self.__class__, self).__init__()
+    self.beam_options = {'project_id': project_id,
+                         'instance_id': instance_id,
+                         'table_id': table_id,
+                         'row_set': row_set,
+                         'filter_': filter_}
+    self.table = None
+    self.read_row = Metrics.counter(self.__class__.__name__, 'read_row')
+
+  def __getstate__(self):
+    return self.beam_options
+
+  def __setstate__(self, options):
+    self.beam_options = {'project_id': options['project_id'],
+                         'instance_id': options['instance_id'],
+                         'table_id': options['table_id'],
+                         'row_set': options['row_set'],
+                         'filter_': options['filter_']}
+
+  def _getTable(self):
+    if self.table is None:
+      client = Client(project=self.beam_options['project_id'])
+      instance = client.instance(self.beam_options['instance_id'])
+      self.table = instance.table(self.beam_options['table_id'])
+    return self.table
+
+  def estimate_size(self):
+    size = [k.offset_bytes for k in self._getTable().sample_row_keys()][-1]
+    return size
+
+  def get_sample_row_keys(self):
+    return self._getTable().sample_row_keys()
+
+  def get_range_tracker(self, start_position, stop_position):
+    return LexicographicKeyRangeTracker(start_position, stop_position)
+
+  def split(self,
+            desired_bundle_size,
+            start_position=None,
+            stop_position=None):
+
+    if self.beam_options['row_set'] is not None:
+      for sample_row_key in self.beam_options['row_set'].row_ranges:
+        sample_row_keys = self.get_sample_row_keys()
+        for row_split in self.split_range_size(desired_bundle_size,
+                                               sample_row_keys,
+                                               sample_row_key):
+          yield row_split
+    else:
+      suma = 0
+      last_offset = 0
+      current_size = 0
+
+      start_key = b''
+      end_key = b''
+
+      sample_row_keys = self.get_sample_row_keys()
+      for sample_row_key in sample_row_keys:
+        current_size = sample_row_key.offset_bytes-last_offset
+        if suma >= desired_bundle_size:
+          end_key = sample_row_key.row_key
+          for fraction in self.range_split_fraction(suma,
+                                                    desired_bundle_size,
+                                                    start_key, end_key):
+            yield fraction
+          start_key = sample_row_key.row_key
+
+          suma = 0
+        suma += current_size
+        last_offset = sample_row_key.offset_bytes
+
+  def split_range_size(self, desired_size, sample_row_keys, range_):
+    start, end = None, None
+    l = 0
+    for sample_row in sample_row_keys:
+      current = sample_row.offset_bytes - l
+      if sample_row.row_key == b'':
+        continue
 
 Review comment:
   Please add comments to clarify.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 199481)
    Time Spent: 17h 20m  (was: 17h 10m)

> Create a Cloud Bigtable Python connector
> ----------------------------------------
>
>                 Key: BEAM-3342
>                 URL: https://issues.apache.org/jira/browse/BEAM-3342
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>            Priority: Major
>              Labels: triaged
>          Time Spent: 17h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to