[ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=358293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358293
 ]

ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/19 05:07
            Start Date: 12/Dec/19 05:07
    Worklog Time Spent: 10m 
      Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r356960074
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-               table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
     """ The PTransform to access the Bigtable Write connector
     Args:
       project_id(str): GCP Project of to write the Rows
       instance_id(str): GCP Instance to write the Rows
       table_id(str): GCP Table to write the `DirectRows`
     """
     super(WriteToBigTable, self).__init__()
-    self.beam_options = {'project_id': project_id,
-                         'instance_id': instance_id,
-                         'table_id': table_id}
+    self._beam_options = {'project_id': project_id,
+                          'instance_id': instance_id,
+                          'table_id': table_id}
 
   def expand(self, pvalue):
-    beam_options = self.beam_options
+    beam_options = self._beam_options
     return (pvalue
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+    project_id(str): GCP Project ID
+    instance_id(str): GCP Instance ID
+    table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+    """ Constructor of the Read connector of Bigtable
+
+    Args:
+      project_id: [str] GCP Project of to write the Rows
+      instance_id: [str] GCP Instance to write the Rows
+      table_id: [str] GCP Table to write the `DirectRows`
+      filter_: [RowFilter] Filter to apply to columns in a row.
+    """
+    super(self.__class__, self).__init__()
+    self._initialize({'project_id': project_id,
+                      'instance_id': instance_id,
+                      'table_id': table_id,
+                      'filter_': filter_})
+
+  def __getstate__(self):
+    return self._beam_options
+
+  def __setstate__(self, options):
+    self._initialize(options)
+
+  def _initialize(self, options):
+    self._beam_options = options
+    self.table = None
+    self.sample_row_keys = None
+    self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+    if self.table is None:
+      self.table = Client(project=self._beam_options['project_id'])\
+                    .instance(self._beam_options['instance_id'])\
+                    .table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+    for row in self.table.read_rows(start_key=element.start_position,
+                                    end_key=element.end_position,
+                                    filter_=self._beam_options['filter_']):
+      self.row_count.inc()
+      yield row
+
+  def display_data(self):
+    return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+                                         label='Bigtable Project Id'),
+            'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+                                          label='Bigtable Instance Id'),
+            'tableId': DisplayDataItem(self._beam_options['table_id'],
+                                       label='Bigtable Table Id'),
+            'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+                                       label='Bigtable Filter')
+           }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+    """ The PTransform to access the Bigtable Read connector
+
+    Args:
+      project_id: [str] GCP Project of to read the Rows
+      instance_id): [str] GCP Instance to read the Rows
+      table_id): [str] GCP Table to read the Rows
+      filter_: [RowFilter] Filter to apply to columns in a row.
+    """
+    super(self.__class__, self).__init__()
+    self._beam_options = {'project_id': project_id,
+                          'instance_id': instance_id,
+                          'table_id': table_id,
+                          'filter_': filter_}
+
+  def __getstate__(self):
+    return self._beam_options
+
+  def __setstate__(self, options):
+    self._beam_options = options
+
+  def expand(self, pbegin):
+    from apache_beam.transforms import util
+
+    beam_options = self._beam_options
+    table = Client(project=beam_options['project_id'])\
+                .instance(beam_options['instance_id'])\
+                .table(beam_options['table_id'])
+    sample_row_keys = list(table.sample_row_keys())
+
+    if len(sample_row_keys) > 1 and sample_row_keys[0].row_key != b'':
+      SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+      first_key = SampleRowKey(b'', 0)
+      sample_row_keys.insert(0, first_key)
+      sample_row_keys = list(sample_row_keys)
+
+    def split_source(unused_impulse):
+      bundles = []
+      for i in range(1, len(sample_row_keys)):
 
 Review comment:
   @chamikaramj It might, but then how could it work a few months back? The 
code below works now, with Dataflow, albeit from a standalone file, meaning 
that pickling is probably not an issue:
   
   ```
   ... [imports and definitions] ...
   
   class _BigTableReadFn(beam.DoFn):
        def __init__(self, project_id, instance_id, table_id, filter_=None):
                super(self.__class__, self).__init__()
                self._options = {'project_id': project_id, 'instance_id': 
instance_id, 'table_id': table_id, 'filter_': filter_}
                self._initialize()
   
        def _initialize(self):
                from apache_beam.metrics import Metrics
                self._table = None
                self._counter = Metrics.counter(self.__class__, 'Rows Read')
   
        def __getstate__(self):
                return self._options
   
        def __setstate__(self, options):
                self._initialize()
                self._options = options
   
        def start_bundle(self):
                from google.cloud.bigtable import Client
                if self._table is None:
                        _client = Client(self._options['project_id'])
                        _instance = 
_client.instance(self._options['instance_id'])
                        self._table = _instance.table(self._options['table_id'])
   
        def process(self, source_bundle):
                _start_key = source_bundle.start_position
                _end_key = source_bundle.stop_position
                for row in self._table.read_rows(_start_key, _end_key):
                        self._counter.inc()
                        yield row
   
        def finish_bundle(self):
                pass
   
        def display_data(self):
                return {'projectId': 
DisplayDataItem(self._options['project_id'], label='Bigtable Project Id'),
                                'instanceId': 
DisplayDataItem(self._options['instance_id'], label='Bigtable Instance Id'),
                                'tableId': 
DisplayDataItem(self._options['table_id'], label='Bigtable Table Id')}
   
   table = Client(PROJECT_ID, admin=True).instance(INSTANCE_ID).table(TABLE_ID)
   keys = table.sample_row_keys()
   key_list = list(keys)
   
   SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
   key_list.insert(0, SampleRowKey(b'', 0))
   
   def bundles():
        for i in range(1, len(key_list)):
                key_1 = key_list[i - 1].row_key
                key_2 = key_list[i].row_key
                size = key_list[i].offset_bytes - key_list[i - 1].offset_bytes
                yield iobase.SourceBundle(size, None, key_1, key_2)
   
   p = beam.Pipeline(options=p_options)
   count = (p
                 | 'Bundles' >> beam.Create(bundles())
                 | 'Read' >> beam.ParDo(_BigTableReadFn(PROJECT_ID, 
INSTANCE_ID, TABLE_ID))
                 | 'Count' >> Count.Globally()
                 )
   
   result = p.run()
   result.wait_until_finish()
   
    ... [assertions] ...
   ```
   
   When packaged, it becomes more like this:
   
   ```
   class _BigTableReadFn(beam.DoFn):
     def __init__(self, project_id, instance_id, table_id, filter_=None):
       super(_BigTableReadFn, self).__init__()
       self._initialize({'project_id': project_id, 'instance_id': instance_id, 
'table_id': table_id, 'filter_': filter_})
   
     def _initialize(self, options):
       self._options = options
       self._table = None
       self._counter = Metrics.counter(self.__class__, 'Rows Read')
   
     def __getstate__(self):
       return self._options
   
     def __setstate__(self, options):
       self._initialize(options)
   
     def start_bundle(self):
       if self._table is None:
         _client = Client(project=self._options['project_id'])
         _instance = _client.instance(self._options['instance_id'])
         # noinspection PyAttributeOutsideInit
         self._table = _instance.table(self._options['table_id'])
   
     def process(self, source_bundle):
       _start_key = source_bundle.start_position
       _end_key = source_bundle.stop_position
       for row in self._table.read_rows(_start_key, _end_key):
         self._counter.inc()
         yield row
   
     def display_data(self):
       return {'projectId': DisplayDataItem(self._options['project_id'], 
label='Bigtable Project Id'),
               'instanceId': DisplayDataItem(self._options['instance_id'], 
label='Bigtable Instance Id'),
               'tableId': DisplayDataItem(self._options['table_id'], 
label='Bigtable Table Id')}
   
   
   class ReadFromBigTable(beam.PTransform):
     def __init__(self, project_id, instance_id, table_id, filter_=None):
       super(self.__class__, self).__init__()
       self._options = {'project_id': project_id,
                        'instance_id': instance_id,
                        'table_id': table_id,
                        'filter_': filter_}
   
     def expand(self, pbegin):
       table = Client(project=self._options['project_id'], admin=True) \
         .instance(instance_id=self._options['instance_id']) \
         .table(table_id=self._options['table_id'])
   
       keys = list(table.sample_row_keys())
       SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
       keys.insert(0, SampleRowKey(b'', 0))
   
       def bundles():
         for i in range(1, len(keys)):
           key_1 = keys[i - 1].row_key
           key_2 = keys[i].row_key
           size = keys[i].offset_bytes - keys[i - 1].offset_bytes
           yield iobase.SourceBundle(size, None, key_1, key_2)
   
       return (pbegin
               | 'Bundles' >> beam.Create(iter(bundles()))
               | 'Reshuffle' >> util.Reshuffle()
               | 'Read' >> 
beam.ParDo(_BigtableReadFn(self._options['project_id'], 
self._options['instance_id'], self._options['table_id']))
               )
   ```
   
   The latter breaks with Dataflow while still running under Direct. As you can 
see, the logic is nearly identical, suggesting that some magic might happen 
during [un]packaging.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 358293)
    Time Spent: 43.5h  (was: 43h 20m)

> Create a Cloud Bigtable IO connector for Python
> -----------------------------------------------
>
>                 Key: BEAM-3342
>                 URL: https://issues.apache.org/jira/browse/BEAM-3342
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>            Priority: Major
>          Time Spent: 43.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to