sjvanrossum commented on code in PR #34135: URL: https://github.com/apache/beam/pull/34135#discussion_r2116496755
########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -125,1753 +121,2023 @@ class FileFormat(object): - CSV = 'CSV' - JSON = 'NEWLINE_DELIMITED_JSON' - AVRO = 'AVRO' + CSV = "CSV" + JSON = "NEWLINE_DELIMITED_JSON" + AVRO = "AVRO" class ExportCompression(object): - GZIP = 'GZIP' - DEFLATE = 'DEFLATE' - SNAPPY = 'SNAPPY' - NONE = 'NONE' + GZIP = "GZIP" + DEFLATE = "DEFLATE" + SNAPPY = "SNAPPY" + NONE = "NONE" def default_encoder(obj): - if isinstance(obj, decimal.Decimal): - return str(obj) - elif isinstance(obj, bytes): - # on python 3 base64-encoded bytes are decoded to strings - # before being sent to BigQuery - return obj.decode('utf-8') - elif isinstance(obj, (datetime.date, datetime.time)): - return str(obj) - elif isinstance(obj, datetime.datetime): - return obj.isoformat() - - _LOGGER.error("Unable to serialize %r to JSON", obj) - raise TypeError( - "Object of type '%s' is not JSON serializable" % type(obj).__name__) + if isinstance(obj, decimal.Decimal): + return str(obj) + elif isinstance(obj, bytes): + # on python 3 base64-encoded bytes are decoded to strings + # before being sent to BigQuery + return obj.decode("utf-8") + elif isinstance(obj, (datetime.date, datetime.time)): + return str(obj) + elif isinstance(obj, datetime.datetime): + return obj.isoformat() def get_hashable_destination(destination): - """Parses a table reference into a (project, dataset, table) tuple. + """Parses a table reference into a (project, dataset, table) tuple. - Args: - destination: Either a TableReference object from the bigquery API. - The object has the following attributes: projectId, datasetId, and - tableId. Or a string representing the destination containing + Args: + destination: Either a TableReference object from the bigquery API. + The object has the following attributes: projectId, datasetId, and + tableId. Or a string representing the destination containing + 'PROJECT:DATASET.TABLE'. + Returns: + A string representing the destination containing 'PROJECT:DATASET.TABLE'. - Returns: - A string representing the destination containing - 'PROJECT:DATASET.TABLE'. - """ - if isinstance(destination, TableReference): - return '%s:%s.%s' % ( - destination.projectId, destination.datasetId, destination.tableId) - else: - return destination + """ + if isinstance(destination, TableReference): + return "%s:%s.%s" % ( + destination.projectId, + destination.datasetId, + destination.tableId, + ) + else: + return destination -V = TypeVar('V') +V = TypeVar("V") def to_hashable_table_ref( - table_ref_elem_kv: Tuple[Union[str, TableReference], V]) -> Tuple[str, V]: - """Turns the key of the input tuple to its string representation. The key - should be either a string or a TableReference. + table_ref_elem_kv: Tuple[Union[str, TableReference], V], +) -> Tuple[str, V]: + """Turns the key of the input tuple to its string representation. The key + should be either a string or a TableReference. - Args: - table_ref_elem_kv: A tuple of table reference and element. + Args: + table_ref_elem_kv: A tuple of table reference and element. - Returns: - A tuple of string representation of input table and input element. - """ - table_ref = table_ref_elem_kv[0] - hashable_table_ref = get_hashable_destination(table_ref) - return (hashable_table_ref, table_ref_elem_kv[1]) + Returns: + A tuple of string representation of input table and input element. + """ + table_ref = table_ref_elem_kv[0] + hashable_table_ref = get_hashable_destination(table_ref) + return (hashable_table_ref, table_ref_elem_kv[1]) def parse_table_schema_from_json(schema_string): - """Parse the Table Schema provided as string. - - Args: - schema_string: String serialized table schema, should be a valid JSON. - - Returns: - A TableSchema of the BigQuery export from either the Query or the Table. - """ - try: - json_schema = json.loads(schema_string) - except JSONDecodeError as e: - raise ValueError( - 'Unable to parse JSON schema: %s - %r' % (schema_string, e)) - - def _parse_schema_field(field): - """Parse a single schema field from dictionary. + """Parse the Table Schema provided as string. Args: - field: Dictionary object containing serialized schema. + schema_string: String serialized table schema, should be a valid JSON. Returns: - A TableFieldSchema for a single column in BigQuery. + A TableSchema of the BigQuery export from either the Query or the Table. """ - schema = bigquery.TableFieldSchema() - schema.name = field['name'] - schema.type = field['type'] - if 'mode' in field: - schema.mode = field['mode'] - else: - schema.mode = 'NULLABLE' - if 'description' in field: - schema.description = field['description'] - if 'fields' in field: - schema.fields = [_parse_schema_field(x) for x in field['fields']] - return schema + try: + json_schema = json.loads(schema_string) + except JSONDecodeError as e: + raise ValueError("Unable to parse JSON schema: %s - %r" % + (schema_string, e)) + + def _parse_schema_field(field): + """Parse a single schema field from dictionary. + + Args: + field: Dictionary object containing serialized schema. + + Returns: + A TableFieldSchema for a single column in BigQuery. + """ + schema = bigquery.TableFieldSchema() + schema.name = field["name"] + schema.type = field["type"] + if "mode" in field: + schema.mode = field["mode"] + else: + schema.mode = "NULLABLE" + if "description" in field: + schema.description = field["description"] + if "fields" in field: + schema.fields = [_parse_schema_field(x) for x in field["fields"]] + return schema - fields = [_parse_schema_field(f) for f in json_schema['fields']] - return bigquery.TableSchema(fields=fields) + fields = [_parse_schema_field(f) for f in json_schema["fields"]] + return bigquery.TableSchema(fields=fields) def parse_table_reference(table, dataset=None, project=None): - """Parses a table reference into a (project, dataset, table) tuple. - - Args: - table: The ID of the table. The ID must contain only letters - (a-z, A-Z), numbers (0-9), connectors (-_). If dataset argument is None - then the table argument must contain the entire table reference: - 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a - TableReference instance in which case dataset and project are - ignored and the reference is returned as a result. Additionally, for date - partitioned tables, appending '$YYYYmmdd' to the table name is supported, - e.g. 'DATASET.TABLE$YYYYmmdd'. - dataset: The ID of the dataset containing this table or null if the table - reference is specified entirely by the table argument. - project: The ID of the project containing this table or null if the table - reference is specified entirely by the table (and possibly dataset) - argument. - - Returns: - A TableReference object from the bigquery API. The object has the following - attributes: projectId, datasetId, and tableId. - If the input is a TableReference object, a new object will be returned. - - Raises: - ValueError: if the table reference as a string does not match the expected - format. - """ - - if isinstance(table, TableReference): - return TableReference( - projectId=table.projectId, - datasetId=table.datasetId, - tableId=table.tableId) - elif callable(table): - return table - elif isinstance(table, value_provider.ValueProvider): - return table - - table_reference = TableReference() - # If dataset argument is not specified, the expectation is that the - # table argument will contain a full table reference instead of just a - # table name. - if dataset is None: - pattern = ( - f'((?P<project>{_PROJECT_PATTERN})[:\\.])?' - f'(?P<dataset>{_DATASET_PATTERN})\\.(?P<table>{_TABLE_PATTERN})') - match = regex.fullmatch(pattern, table) - if not match: - raise ValueError( - 'Expected a table reference (PROJECT:DATASET.TABLE or ' - 'DATASET.TABLE) instead of %s.' % table) - table_reference.projectId = match.group('project') - table_reference.datasetId = match.group('dataset') - table_reference.tableId = match.group('table') - else: - table_reference.projectId = project - table_reference.datasetId = dataset - table_reference.tableId = table - return table_reference + """Parses a table reference into a (project, dataset, table) tuple. + + Args: + table: The ID of the table. The ID must contain only letters + (a-z, A-Z), numbers (0-9), connectors (-_). If dataset argument is None + then the table argument must contain the entire table reference: + 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a + TableReference instance in which case dataset and project are + ignored and the reference is returned as a result. Additionally, for date + partitioned tables, appending '$YYYYmmdd' to the table name is supported, + e.g. 'DATASET.TABLE$YYYYmmdd'. + dataset: The ID of the dataset containing this table or null if the table + reference is specified entirely by the table argument. + project: The ID of the project containing this table or null if the table + reference is specified entirely by the table (and possibly dataset) + argument. + + Returns: + A TableReference object from the bigquery API. The object has the following + attributes: projectId, datasetId, and tableId. + If the input is a TableReference object, a new object will be returned. + + Raises: + ValueError: if the table reference as a string does not match the expected + format. + """ + + if isinstance(table, TableReference): + return TableReference(projectId=table.projectId, + datasetId=table.datasetId, + tableId=table.tableId) + elif callable(table): + return table + elif isinstance(table, value_provider.ValueProvider): + return table + + table_reference = TableReference() + # If dataset argument is not specified, the expectation is that the + # table argument will contain a full table reference instead of just a + # table name. + if dataset is None: + pattern = ( + f"((?P<project>{_PROJECT_PATTERN})[:\\.])?" + f"(?P<dataset>{_DATASET_PATTERN})\\.(?P<table>{_TABLE_PATTERN})") + match = regex.fullmatch(pattern, table) + if not match: + raise ValueError( + "Expected a table reference (PROJECT:DATASET.TABLE or " + "DATASET.TABLE) instead of %s." % table) + table_reference.projectId = match.group("project") + table_reference.datasetId = match.group("dataset") + table_reference.tableId = match.group("table") + else: + table_reference.projectId = project + table_reference.datasetId = dataset + table_reference.tableId = table + return table_reference # ----------------------------------------------------------------------------- # BigQueryWrapper. def _build_job_labels(input_labels): - """Builds job label protobuf structure.""" - input_labels = input_labels or {} - result = bigquery.JobConfiguration.LabelsValue() - - for k, v in input_labels.items(): - result.additionalProperties.append( - bigquery.JobConfiguration.LabelsValue.AdditionalProperty( - key=k, - value=v, - )) - return result + """Builds job label protobuf structure.""" + input_labels = input_labels or {} + result = bigquery.JobConfiguration.LabelsValue() + + for k, v in input_labels.items(): + result.additionalProperties.append( + bigquery.JobConfiguration.LabelsValue.AdditionalProperty( + key=k, + value=v, + )) + return result def _build_dataset_labels(input_labels): - """Builds dataset label protobuf structure.""" - input_labels = input_labels or {} - result = bigquery.Dataset.LabelsValue() - - for k, v in input_labels.items(): - result.additionalProperties.append( - bigquery.Dataset.LabelsValue.AdditionalProperty( - key=k, - value=v, - )) - return result + """Builds dataset label protobuf structure.""" + input_labels = input_labels or {} + result = bigquery.Dataset.LabelsValue() + + for k, v in input_labels.items(): + result.additionalProperties.append( + bigquery.Dataset.LabelsValue.AdditionalProperty( + key=k, + value=v, + )) + return result def _build_filter_from_labels(labels): - filter_str = '' - for key, value in labels.items(): - filter_str += 'labels.' + key + ':' + value + ' ' - return filter_str + filter_str = "" + for key, value in labels.items(): + filter_str += "labels." + key + ":" + value + " " + return filter_str -class BigQueryWrapper(object): - """BigQuery client wrapper with utilities for querying. - - The wrapper is used to organize all the BigQuery integration points and - offer a common place where retry logic for failures can be controlled. - In addition, it offers various functions used both in sources and sinks - (e.g., find and create tables, query a table, etc.). - - Note that client parameter in constructor is only for testing purposes and - should not be used in production code. - """ - - # If updating following names, also update the corresponding pydocs in - # bigquery.py. - TEMP_TABLE = 'beam_temp_table_' - TEMP_DATASET = 'beam_temp_dataset_' - - HISTOGRAM_METRIC_LOGGER = MetricLogger() - - def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): - self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) - self.gcp_bq_client = client or gcp_bigquery.Client( - client_info=ClientInfo( - user_agent="apache-beam-%s" % apache_beam.__version__)) - - self._unique_row_id = 0 - # For testing scenarios where we pass in a client we do not want a - # randomized prefix for row IDs. - self._row_id_prefix = '' if client else uuid.uuid4() - self._latency_histogram_metric = Metrics.histogram( - self.__class__, - 'latency_histogram_ms', - LinearBucket(0, 20, 3000), - BigQueryWrapper.HISTOGRAM_METRIC_LOGGER) - - if temp_dataset_id is not None and temp_table_ref is not None: - raise ValueError( - 'Both a BigQuery temp_dataset_id and a temp_table_ref were specified.' - ' Please specify only one of these.') - - if temp_dataset_id and temp_dataset_id.startswith(self.TEMP_DATASET): - raise ValueError( - 'User provided temp dataset ID cannot start with %r' % - self.TEMP_DATASET) - - if temp_table_ref is not None: - self.temp_table_ref = temp_table_ref - self.temp_dataset_id = temp_table_ref.datasetId - else: - self.temp_table_ref = None - self._temporary_table_suffix = uuid.uuid4().hex - self.temp_dataset_id = temp_dataset_id or self._get_temp_dataset() +# Module-level table definition cache constants and variables +_TABLE_DEFINITION_TTL = 1 # 1 second (default) +_TABLE_CACHE_MAX_SIZE = 1000 # Default maximum cache size +_TABLE_CACHE = cachetools.TTLCache(maxsize=_TABLE_CACHE_MAX_SIZE, + ttl=_TABLE_DEFINITION_TTL) +_CACHE_ENABLED = True # Flag to track if caching is enabled (TTL > 0) - self.created_temp_dataset = False - @property - def unique_row_id(self): - """Returns a unique row ID (str) used to avoid multiple insertions. +class BigQueryWrapper(object): + """BigQuery client wrapper with utilities for querying. - If the row ID is provided, BigQuery will make a best effort to not insert - the same row multiple times for fail and retry scenarios in which the insert - request may be issued several times. This comes into play for sinks executed - in a local runner. + The wrapper is used to organize all the BigQuery integration points and + offer a common place where retry logic for failures can be controlled. + In addition, it offers various functions used both in sources and sinks + (e.g., find and create tables, query a table, etc.). - Returns: - a unique row ID string - """ - self._unique_row_id += 1 - return '%s_%d' % (self._row_id_prefix, self._unique_row_id) - - def _get_temp_table(self, project_id): - if self.temp_table_ref: - return self.temp_table_ref - - return parse_table_reference( - table=BigQueryWrapper.TEMP_TABLE + self._temporary_table_suffix, - dataset=self.temp_dataset_id, - project=project_id) - - def _get_temp_dataset(self): - if self.temp_table_ref: - return self.temp_table_ref.datasetId - return BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def get_query_location(self, project_id, query, use_legacy_sql): + Note that client parameter in constructor is only for testing purposes and + should not be used in production code. """ - Get the location of tables referenced in a query. - This method returns the location of the first available referenced - table for user in the query and depends on the BigQuery service to - provide error handling for queries that reference tables in multiple - locations. - """ - reference = bigquery.JobReference( - jobId=uuid.uuid4().hex, projectId=project_id) - request = bigquery.BigqueryJobsInsertRequest( - projectId=project_id, - job=bigquery.Job( - configuration=bigquery.JobConfiguration( - dryRun=True, - query=bigquery.JobConfigurationQuery( - query=query, - useLegacySql=use_legacy_sql, - )), - jobReference=reference)) - - response = self.client.jobs.Insert(request) - - if response.statistics is None: - # This behavior is only expected in tests - _LOGGER.warning( - "Unable to get location, missing response.statistics. Query: %s", - query) - return None - - referenced_tables = response.statistics.query.referencedTables - if referenced_tables: # Guards against both non-empty and non-None - for table in referenced_tables: - try: - location = self.get_table_location( - table.projectId, table.datasetId, table.tableId) - except HttpForbiddenError: - # Permission access for table (i.e. from authorized_view), - # try next one - continue - _LOGGER.info( - "Using location %r from table %r referenced by query %s", - location, - table, - query) - return location - - _LOGGER.debug( - "Query %s does not reference any tables or " - "you don't have permission to inspect them.", - query) - return None - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _insert_copy_job( - self, - project_id, - job_id, - from_table_reference, - to_table_reference, - create_disposition=None, - write_disposition=None, - job_labels=None): - reference = bigquery.JobReference() - reference.jobId = job_id - reference.projectId = project_id - request = bigquery.BigqueryJobsInsertRequest( - projectId=project_id, - job=bigquery.Job( - configuration=bigquery.JobConfiguration( - copy=bigquery.JobConfigurationTableCopy( - destinationTable=to_table_reference, - sourceTable=from_table_reference, - createDisposition=create_disposition, - writeDisposition=write_disposition, - ), - labels=_build_job_labels(job_labels), - ), - jobReference=reference, - )) - - return self._start_job(request).jobReference - - @retry.with_exponential_backoff( - num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _insert_load_job( - self, - project_id, - job_id, - table_reference, - source_uris=None, - source_stream=None, - schema=None, - write_disposition=None, - create_disposition=None, - additional_load_parameters=None, - source_format=None, - job_labels=None): - - if not source_uris and not source_stream: - _LOGGER.warning( - 'Both source URIs and source stream are not provided. BigQuery load ' - 'job will not load any data.') - - if source_uris and source_stream: - raise ValueError( - 'Only one of source_uris and source_stream may be specified. ' - 'Got both.') - - if source_uris is None: - source_uris = [] - - additional_load_parameters = additional_load_parameters or {} - job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema - reference = bigquery.JobReference(jobId=job_id, projectId=project_id) - request = bigquery.BigqueryJobsInsertRequest( - projectId=project_id, - job=bigquery.Job( - configuration=bigquery.JobConfiguration( - load=bigquery.JobConfigurationLoad( - sourceUris=source_uris, - destinationTable=table_reference, - schema=job_schema, - writeDisposition=write_disposition, - createDisposition=create_disposition, - sourceFormat=source_format, - useAvroLogicalTypes=True, - autodetect=schema == 'SCHEMA_AUTODETECT', - **additional_load_parameters), - labels=_build_job_labels(job_labels), - ), - jobReference=reference, - )) - return self._start_job(request, stream=source_stream).jobReference - - @staticmethod - def _parse_location_from_exc(content, job_id): - """Parse job location from Exception content.""" - if isinstance(content, bytes): - content = content.decode('ascii', 'replace') - # search for "Already Exists: Job <project-id>:<location>.<job id>" - m = re.search(r"Already Exists: Job \S+\:(\S+)\." + job_id, content) - if not m: - _LOGGER.warning( - "Not able to parse BigQuery load job location for %s", job_id) - return None - return m.group(1) - - def _start_job( - self, - request: 'bigquery.BigqueryJobsInsertRequest', - stream=None, - ): - """Inserts a BigQuery job. - - If the job exists already, it returns it. + TEMP_TABLE = "beam_temp_table_" + TEMP_DATASET = "beam_temp_dataset_" Review Comment: ```suggestion # If updating following names, also update the corresponding pydocs in # bigquery.py. TEMP_TABLE = "beam_temp_table_" TEMP_DATASET = "beam_temp_dataset_" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org