[ 
https://issues.apache.org/jira/browse/BEAM-12904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Korzeniewski updated BEAM-12904:
------------------------------------------
    Description: 
 

There are 2 things:
 1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom App 
profiles at all
 2. i've added support to custom DoFn, its passed correctly and works on 
DirectRunner, and even shows correct passed params in Dataflow logs, but still 
uses 'default' app_profile_id. 
 BigTable needs to use single-cluster routing to support transactional writes 
(read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom 
app_profile_id.
 Its easy to trigger just by passing not-existent app_profile_id: DirectRunner 
crashes with error, Dataflow runner uses 'default'.
 Custom write func:
{code:java}
class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
  def __init__(self, project_id, instance_id, app_profile_id, table_id, 
column_family, column: str):
    super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
    self.beam_options = {
        'project_id': project_id,
        'instance_id': instance_id,
        'app_profile_id': app_profile_id,
        'table_id': table_id,
        'column_family': column_family,
        'column': column,
    }
    self.table = None
    self.written = Metrics.counter(self.__class__, 'Written Row')

  def __getstate__(self):
    return self.beam_options

  def __setstate__(self, options):
    self.beam_options = options
    self.table = None
    self.written = Metrics.counter(self.__class__, 'Written Row')

  def start_bundle(self):
    if self.table is None:
      client = Client(project=self.beam_options['project_id'])
      instance = client.instance(self.beam_options['instance_id'])

    # add admin=True param in client ininitialization and uncomment below 
    # for profile in instance.list_app_profiles():
    #   logging.info('Profile name: %s', profile.name)
    #   logging.info('Profile desc: %s', profile.description)
    #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
    #   logging.info('Cluster id: %s', profile.cluster_id)
    #   logging.info('Transactional writes: %s', 
profile.allow_transactional_writes)

      self.table = instance.table(table_id=self.beam_options['table_id'], 
app_profile_id=self.beam_options['app_profile_id'])

  def process(self, kvmessage):
    self.written.inc()

    row_key, value = kvmessage

    row_filter = row_filters.RowFilterChain(
        
filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
                 
row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
                ])
    bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
    params = {'column_family_id': self.beam_options['column_family'], 'column': 
self.beam_options['column'], 'value': value[self.beam_options['column']], 
'timestamp': datetime.fromtimestamp(0, timezone.utc)}
    bt_row.set_cell(**params)
    bt_row.commit()


  def finish_bundle(self):
    pass

  def display_data(self):
    return {
        'projectId': DisplayDataItem(
            self.beam_options['project_id'], label='Bigtable Project Id'),
        'instanceId': DisplayDataItem(
            self.beam_options['instance_id'], label='Bigtable Instance Id'),
        'tableId': DisplayDataItem(
            self.beam_options['table_id'], label='Bigtable Table Id')
    }
{code}
 

  was:
 

There are 2 things:
1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for App 
profiles at all
2. i've added support to custom DoFn, its passed correctly and works on 
DirectRunner, and even shows correct passed params in Dataflow logs, but still 
uses 'default' app_profile_id. 
BigTable needs to use single-cluster routing to support transactional writes 
(read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom 
app_profile_id.
Its easy to trigger just by passing not-existent app_profile_id: DirectRunner 
crashes with error, Dataflow runner uses 'default'.
Custom write func:
{code:java}
class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
  def __init__(self, project_id, instance_id, app_profile_id, table_id, 
column_family, column: str):
    super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
    self.beam_options = {
        'project_id': project_id,
        'instance_id': instance_id,
        'app_profile_id': app_profile_id,
        'table_id': table_id,
        'column_family': column_family,
        'column': column,
    }
    self.table = None
    self.written = Metrics.counter(self.__class__, 'Written Row')

  def __getstate__(self):
    return self.beam_options

  def __setstate__(self, options):
    self.beam_options = options
    self.table = None
    self.written = Metrics.counter(self.__class__, 'Written Row')

  def start_bundle(self):
    if self.table is None:
      client = Client(project=self.beam_options['project_id'])
      instance = client.instance(self.beam_options['instance_id'])

    # add admin=True param in client ininitialization and uncomment below 
    # for profile in instance.list_app_profiles():
    #   logging.info('Profile name: %s', profile.name)
    #   logging.info('Profile desc: %s', profile.description)
    #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
    #   logging.info('Cluster id: %s', profile.cluster_id)
    #   logging.info('Transactional writes: %s', 
profile.allow_transactional_writes)

      self.table = instance.table(table_id=self.beam_options['table_id'], 
app_profile_id=self.beam_options['app_profile_id'])

  def process(self, kvmessage):
    self.written.inc()

    row_key, value = kvmessage

    row_filter = row_filters.RowFilterChain(
        
filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
                 
row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
                ])
    bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
    params = {'column_family_id': self.beam_options['column_family'], 'column': 
self.beam_options['column'], 'value': value[self.beam_options['column']], 
'timestamp': datetime.fromtimestamp(0, timezone.utc)}
    bt_row.set_cell(**params)
    bt_row.commit()


  def finish_bundle(self):
    pass

  def display_data(self):
    return {
        'projectId': DisplayDataItem(
            self.beam_options['project_id'], label='Bigtable Project Id'),
        'instanceId': DisplayDataItem(
            self.beam_options['instance_id'], label='Bigtable Instance Id'),
        'tableId': DisplayDataItem(
            self.beam_options['table_id'], label='Bigtable Table Id')
    }
{code}
 


> Python DataflowRunner uses always default app_profile_id when writing to 
> BigTable, when using custom write fn
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12904
>                 URL: https://issues.apache.org/jira/browse/BEAM-12904
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.28.0, 2.32.0
>         Environment: Default Python SDK image for environment is 
> apache/beam_python3.7_sdk:2.32.0
> Using provided Python SDK container image: 
> gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
>            Reporter: Krzysztof Korzeniewski
>            Priority: P0
>
>  
> There are 2 things:
>  1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom 
> App profiles at all
>  2. i've added support to custom DoFn, its passed correctly and works on 
> DirectRunner, and even shows correct passed params in Dataflow logs, but 
> still uses 'default' app_profile_id. 
>  BigTable needs to use single-cluster routing to support transactional writes 
> (read-modify-write, check-and-mutate). Thats why i need to use in 1 case 
> custom app_profile_id.
>  Its easy to trigger just by passing not-existent app_profile_id: 
> DirectRunner crashes with error, Dataflow runner uses 'default'.
>  Custom write func:
> {code:java}
> class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
>   def __init__(self, project_id, instance_id, app_profile_id, table_id, 
> column_family, column: str):
>     super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
>     self.beam_options = {
>         'project_id': project_id,
>         'instance_id': instance_id,
>         'app_profile_id': app_profile_id,
>         'table_id': table_id,
>         'column_family': column_family,
>         'column': column,
>     }
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def __getstate__(self):
>     return self.beam_options
>   def __setstate__(self, options):
>     self.beam_options = options
>     self.table = None
>     self.written = Metrics.counter(self.__class__, 'Written Row')
>   def start_bundle(self):
>     if self.table is None:
>       client = Client(project=self.beam_options['project_id'])
>       instance = client.instance(self.beam_options['instance_id'])
>     # add admin=True param in client ininitialization and uncomment below 
>     # for profile in instance.list_app_profiles():
>     #   logging.info('Profile name: %s', profile.name)
>     #   logging.info('Profile desc: %s', profile.description)
>     #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
>     #   logging.info('Cluster id: %s', profile.cluster_id)
>     #   logging.info('Transactional writes: %s', 
> profile.allow_transactional_writes)
>       self.table = instance.table(table_id=self.beam_options['table_id'], 
> app_profile_id=self.beam_options['app_profile_id'])
>   def process(self, kvmessage):
>     self.written.inc()
>     row_key, value = kvmessage
>     row_filter = row_filters.RowFilterChain(
>         
> filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
>                  
> row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
>                 ])
>     bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
>     params = {'column_family_id': self.beam_options['column_family'], 
> 'column': self.beam_options['column'], 'value': 
> value[self.beam_options['column']], 'timestamp': datetime.fromtimestamp(0, 
> timezone.utc)}
>     bt_row.set_cell(**params)
>     bt_row.commit()
>   def finish_bundle(self):
>     pass
>   def display_data(self):
>     return {
>         'projectId': DisplayDataItem(
>             self.beam_options['project_id'], label='Bigtable Project Id'),
>         'instanceId': DisplayDataItem(
>             self.beam_options['instance_id'], label='Bigtable Instance Id'),
>         'tableId': DisplayDataItem(
>             self.beam_options['table_id'], label='Bigtable Table Id')
>     }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to