damccorm opened a new issue, #21187:
URL: https://github.com/apache/beam/issues/21187

    
   
   There are 2 things:
    1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom 
App profiles at all
    2. i've added support to custom DoFn, its passed correctly and works on 
DirectRunner, and even shows correct passed params in Dataflow logs, but still 
uses 'default' app_profile_id. 
    Its easy to trigger just by passing not-existent app_profile_id: 
DirectRunner crashes with error, DataflowRunner uses 'default' and crashes if 
'default' is multi-cluster routing and/or transactional writes are disabled.
    BigTable needs to use single-cluster routing to support transactional 
writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case 
custom app_profile_id.
    Custom write func:
   ```
   
   from datetime import datetime, timezone
   import logging
   
   import apache_beam as beam
   from apache_beam.metrics
   import Metrics
   from apache_beam.transforms.display import DisplayDataItem
   from google.cloud.bigtable
   import Client, row_filters
   
   class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
     def __init__(self,
   project_id, instance_id, app_profile_id, table_id, column_family, column: 
str):
       super(BigTableWriteIfNotExistsConditionalFn,
   self).__init__()
       self.beam_options = {
           'project_id': project_id,
           'instance_id':
   instance_id,
           'app_profile_id': app_profile_id,
           'table_id': table_id,
           'column_family':
   column_family,
           'column': column,
       }
       self.table = None
       self.written = Metrics.counter(self.__class__,
   'Written Row')
   
     def __getstate__(self):
       return self.beam_options
   
     def __setstate__(self,
   options):
       self.beam_options = options
       self.table = None
       self.written = Metrics.counter(self.__class__,
   'Written Row')
   
     def start_bundle(self):
       if self.table is None:
         client = Client(project=self.beam_options['project_id'])
   
        instance = client.instance(self.beam_options['instance_id'])
   
       # add admin=True param in client
   ininitialization and uncomment below 
       # for profile in instance.list_app_profiles():
       #   logging.info('Profile
   name: %s', profile.name)
       #   logging.info('Profile desc: %s', profile.description)
       #   logging.info('Routing
   policyt type: %s', profile.routing_policy_type)
       #   logging.info('Cluster id: %s', profile.cluster_id)
   
      #   logging.info('Transactional writes: %s', 
profile.allow_transactional_writes)
   
         self.table
   = instance.table(table_id=self.beam_options['table_id'], 
app_profile_id=self.beam_options['app_profile_id'])
   
   
    def process(self, kvmessage):
       self.written.inc()
   
       row_key, value = kvmessage
   
       row_filter
   = row_filters.RowFilterChain(
           
filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
   
                   
row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
                
     ])
       bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
       params = {'column_family_id':
   self.beam_options['column_family'], 'column': self.beam_options['column'], 
'value': value, 'timestamp':
   datetime.fromtimestamp(0, timezone.utc), 'state': False}
       bt_row.set_cell(**params)
       bt_row.commit()
   
   
   
    def finish_bundle(self):
       pass
   
     def display_data(self):
       return {
           'projectId':
   DisplayDataItem(
               self.beam_options['project_id'], label='Bigtable Project Id'),
        
     'instanceId': DisplayDataItem(
               self.beam_options['instance_id'], label='Bigtable Instance
   Id'),
           'tableId': DisplayDataItem(
               self.beam_options['table_id'], label='Bigtable
   Table Id')
       }
   
   ```
   
    It processes Tuple[string, string] messages, where first string is BigTable 
row_key and second is cell value
   
   Imported from Jira 
[BEAM-12904](https://issues.apache.org/jira/browse/BEAM-12904). Original Jira 
may contain additional context.
   Reported by: mitgath.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to