[ 
https://issues.apache.org/jira/browse/BEAM-12904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-12904:
-----------------------------------

This Jira ticket has a pull request attached to it, but is still open. Did the 
pull request resolve the issue? If so, could you please mark it resolved? This 
will help the project have a clear view of its open issues.

> Python DataflowRunner uses always default app_profile_id when writing to 
> BigTable, when using custom write fn
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12904
>                 URL: https://issues.apache.org/jira/browse/BEAM-12904
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.28.0, 2.32.0
>         Environment: Default Python SDK image for environment is 
> apache/beam_python3.7_sdk:2.32.0
> Using provided Python SDK container image: 
> gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
>            Reporter: Krzysztof Korzeniewski
>            Priority: P2
>              Labels: GCP
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
>  
> There are 2 things:
>  1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom 
> App profiles at all
>  2. i've added support to custom DoFn, its passed correctly and works on 
> DirectRunner, and even shows correct passed params in Dataflow logs, but 
> still uses 'default' app_profile_id. 
>  Its easy to trigger just by passing not-existent app_profile_id: 
> DirectRunner crashes with error, DataflowRunner uses 'default' and crashes if 
> 'default' is multi-cluster routing and/or transactional writes are disabled.
>  BigTable needs to use single-cluster routing to support transactional writes 
> (read-modify-write, check-and-mutate). Thats why i need to use in 1 case 
> custom app_profile_id.
>  Custom write func:
> {code:java}
> from datetime import datetime, timezone
> import logging
> import apache_beam as beam
> from apache_beam.metrics import Metrics
> from apache_beam.transforms.display import DisplayDataItem
> from google.cloud.bigtable import Client, row_filters
> class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
>   def __init__(self, project_id, instance_id, app_profile_id, table_id, 
> column_family, column: str):
>     super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
>     self.beam_options = {
>         'project_id': project_id,
>         'instance_id': instance_id,
>         'app_profile_id': app_profile_id,
>         'table_id': table_id,
>         'column_family': column_family,
>         'column': column,
>     }
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def __getstate__(self):
>     return self.beam_options
>   def __setstate__(self, options):
>     self.beam_options = options
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def start_bundle(self):
>     if self.table is None:
>       client = Client(project=self.beam_options['project_id'])
>       instance = client.instance(self.beam_options['instance_id'])
>     # add admin=True param in client ininitialization and uncomment below 
>     # for profile in instance.list_app_profiles():
>     #   logging.info('Profile name: %s', profile.name)
>     #   logging.info('Profile desc: %s', profile.description)
>     #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
>     #   logging.info('Cluster id: %s', profile.cluster_id)
>     #   logging.info('Transactional writes: %s', 
> profile.allow_transactional_writes)
>       self.table = instance.table(table_id=self.beam_options['table_id'], 
> app_profile_id=self.beam_options['app_profile_id'])
>   def process(self, kvmessage):
>     self.written.inc()
>     row_key, value = kvmessage
>     row_filter = row_filters.RowFilterChain(
>         
> filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
>                  
> row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
>                 ])
>     bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
>     params = {'column_family_id': self.beam_options['column_family'], 
> 'column': self.beam_options['column'], 'value': value, 'timestamp': 
> datetime.fromtimestamp(0, timezone.utc), 'state': False}
>     bt_row.set_cell(**params)
>     bt_row.commit()
>   def finish_bundle(self):
>     pass
>   def display_data(self):
>     return {
>         'projectId': DisplayDataItem(
>             self.beam_options['project_id'], label='Bigtable Project Id'),
>         'instanceId': DisplayDataItem(
>             self.beam_options['instance_id'], label='Bigtable Instance Id'),
>         'tableId': DisplayDataItem(
>             self.beam_options['table_id'], label='Bigtable Table Id')
>     }
> {code}
>  It processes Tuple[string, string] messages, where first string is BigTable 
> row_key and second is cell value



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to