[
https://issues.apache.org/jira/browse/BEAM-12904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-12904:
-----------------------------------
This Jira ticket has a pull request attached to it, but is still open. Did the
pull request resolve the issue? If so, could you please mark it resolved? This
will help the project have a clear view of its open issues.
> Python DataflowRunner uses always default app_profile_id when writing to
> BigTable, when using custom write fn
> -------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12904
> URL: https://issues.apache.org/jira/browse/BEAM-12904
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp, runner-dataflow
> Affects Versions: 2.28.0, 2.32.0
> Environment: Default Python SDK image for environment is
> apache/beam_python3.7_sdk:2.32.0
> Using provided Python SDK container image:
> gcr.io/cloud-dataflow/v1beta3/python37:2.32.0
> Reporter: Krzysztof Korzeniewski
> Priority: P2
> Labels: GCP
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
>
> There are 2 things:
> 1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom
> App profiles at all
> 2. i've added support to custom DoFn, its passed correctly and works on
> DirectRunner, and even shows correct passed params in Dataflow logs, but
> still uses 'default' app_profile_id.
> Its easy to trigger just by passing not-existent app_profile_id:
> DirectRunner crashes with error, DataflowRunner uses 'default' and crashes if
> 'default' is multi-cluster routing and/or transactional writes are disabled.
> BigTable needs to use single-cluster routing to support transactional writes
> (read-modify-write, check-and-mutate). Thats why i need to use in 1 case
> custom app_profile_id.
> Custom write func:
> {code:java}
> from datetime import datetime, timezone
> import logging
> import apache_beam as beam
> from apache_beam.metrics import Metrics
> from apache_beam.transforms.display import DisplayDataItem
> from google.cloud.bigtable import Client, row_filters
> class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
> def __init__(self, project_id, instance_id, app_profile_id, table_id,
> column_family, column: str):
> super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
> self.beam_options = {
> 'project_id': project_id,
> 'instance_id': instance_id,
> 'app_profile_id': app_profile_id,
> 'table_id': table_id,
> 'column_family': column_family,
> 'column': column,
> }
> self.table = None
> self.written = Metrics.counter(self.__class__, 'Written Row')
> def __getstate__(self):
> return self.beam_options
> def __setstate__(self, options):
> self.beam_options = options
> self.table = None
> self.written = Metrics.counter(self.__class__, 'Written Row')
> def start_bundle(self):
> if self.table is None:
> client = Client(project=self.beam_options['project_id'])
> instance = client.instance(self.beam_options['instance_id'])
> # add admin=True param in client ininitialization and uncomment below
> # for profile in instance.list_app_profiles():
> # logging.info('Profile name: %s', profile.name)
> # logging.info('Profile desc: %s', profile.description)
> # logging.info('Routing policyt type: %s', profile.routing_policy_type)
> # logging.info('Cluster id: %s', profile.cluster_id)
> # logging.info('Transactional writes: %s',
> profile.allow_transactional_writes)
> self.table = instance.table(table_id=self.beam_options['table_id'],
> app_profile_id=self.beam_options['app_profile_id'])
> def process(self, kvmessage):
> self.written.inc()
> row_key, value = kvmessage
> row_filter = row_filters.RowFilterChain(
>
> filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
>
> row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
> ])
> bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
> params = {'column_family_id': self.beam_options['column_family'],
> 'column': self.beam_options['column'], 'value': value, 'timestamp':
> datetime.fromtimestamp(0, timezone.utc), 'state': False}
> bt_row.set_cell(**params)
> bt_row.commit()
> def finish_bundle(self):
> pass
> def display_data(self):
> return {
> 'projectId': DisplayDataItem(
> self.beam_options['project_id'], label='Bigtable Project Id'),
> 'instanceId': DisplayDataItem(
> self.beam_options['instance_id'], label='Bigtable Instance Id'),
> 'tableId': DisplayDataItem(
> self.beam_options['table_id'], label='Bigtable Table Id')
> }
> {code}
> It processes Tuple[string, string] messages, where first string is BigTable
> row_key and second is cell value
--
This message was sent by Atlassian Jira
(v8.20.1#820001)