This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 30fd958f5fc feat(bigquery): add GEOGRAPHY type support for BigQuery
I/O (#36121)
30fd958f5fc is described below
commit 30fd958f5fc92ef2a7e069763f43077e32c63873
Author: liferoad <[email protected]>
AuthorDate: Wed Oct 15 14:47:47 2025 -0400
feat(bigquery): add GEOGRAPHY type support for BigQuery I/O (#36121)
* feat(bigquery): add GEOGRAPHY type support for BigQuery I/O
Add support for BigQuery GEOGRAPHY type which works with Well-Known Text
(WKT) format. The change includes:
- Adding GEOGRAPHY to type mappings in bigquery_tools and
bigquery_schema_tools
- Implementing GeographyType logical type in schemas.py
- Adding comprehensive tests for GEOGRAPHY type conversion and schema
integration
* fixed tests
* tests
* fixed tests
* fixes language_type
* fixed logical type
* urns
* add BQ IT
* yapf
* feat(bigquery): add project handling and test improvements
- Add _get_project method to handle project billing in BigQuery source
- Update tests to explicitly specify project parameter
- Fix geography test data formats and simplify test cases
- Add temporary storage location for file load tests
* lint
* format
* removed GeographyType for now
* restore schemas.py
* added uses_gcp_java_expansion_service
---
sdks/python/apache_beam/io/gcp/bigquery.py | 13 +
.../io/gcp/bigquery_geography_it_test.py | 540 +++++++++++++++++++++
.../apache_beam/io/gcp/bigquery_schema_tools.py | 3 +-
.../io/gcp/bigquery_schema_tools_test.py | 134 ++++-
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 1 +
.../apache_beam/io/gcp/bigquery_tools_test.py | 154 ++++++
6 files changed, 841 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 0905ba764de..7d5dd876bda 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1029,6 +1029,16 @@ class _CustomBigQueryStorageSource(BoundedSource):
self._step_name = step_name
self._source_uuid = unique_id
+ def _get_project(self):
+ """Returns the project that queries and exports will be billed to."""
+ if self.pipeline_options:
+ project = self.pipeline_options.view_as(GoogleCloudOptions).project
+ if isinstance(project, vp.ValueProvider):
+ project = project.get()
+ if project:
+ return project
+ return self.project
+
def _get_parent_project(self):
"""Returns the project that will be billed."""
if self.temp_table:
@@ -1164,6 +1174,9 @@ class _CustomBigQueryStorageSource(BoundedSource):
self._setup_temporary_dataset(bq)
self.table_reference = self._execute_query(bq)
+ if not self.table_reference.projectId:
+ self.table_reference.projectId = self._get_project()
+
requested_session = bq_storage.types.ReadSession()
requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format(
self.table_reference.projectId,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py
new file mode 100644
index 00000000000..5a506d3162f
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py
@@ -0,0 +1,540 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Integration tests for BigQuery GEOGRAPHY data type support."""
+
+import logging
+import secrets
+import time
+import unittest
+
+import hamcrest as hc
+import pytest
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery import ReadFromBigQuery
+from apache_beam.io.gcp.bigquery import WriteToBigQuery
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+
+_LOGGER = logging.getLogger(__name__)
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class BigQueryGeographyIntegrationTests(unittest.TestCase):
+ """Integration tests for BigQuery GEOGRAPHY data type."""
+
+ BIG_QUERY_DATASET_ID = 'python_geography_it_test_'
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+ self.project = self.test_pipeline.get_option('project')
+
+ self.bigquery_client = BigQueryWrapper()
+ self.dataset_id = '%s%d%s' % (
+ self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3))
+ self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+ _LOGGER.info(
+ "Created dataset %s in project %s", self.dataset_id, self.project)
+
+ def tearDown(self):
+ request = bigquery.BigqueryDatasetsDeleteRequest(
+ projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
+ try:
+ _LOGGER.info(
+ "Deleting dataset %s in project %s", self.dataset_id, self.project)
+ self.bigquery_client.client.datasets.Delete(request)
+ except HttpError:
+ _LOGGER.debug(
+ 'Failed to clean up dataset %s in project %s',
+ self.dataset_id,
+ self.project)
+
+ def create_geography_table(self, table_name, include_repeated=False):
+ """Create a table with various GEOGRAPHY field configurations."""
+ table_schema = bigquery.TableSchema()
+
+ # ID field
+ id_field = bigquery.TableFieldSchema()
+ id_field.name = 'id'
+ id_field.type = 'INTEGER'
+ id_field.mode = 'REQUIRED'
+ table_schema.fields.append(id_field)
+
+ # Required GEOGRAPHY field
+ geo_required = bigquery.TableFieldSchema()
+ geo_required.name = 'location'
+ geo_required.type = 'GEOGRAPHY'
+ geo_required.mode = 'REQUIRED'
+ table_schema.fields.append(geo_required)
+
+ # Nullable GEOGRAPHY field
+ geo_nullable = bigquery.TableFieldSchema()
+ geo_nullable.name = 'optional_location'
+ geo_nullable.type = 'GEOGRAPHY'
+ geo_nullable.mode = 'NULLABLE'
+ table_schema.fields.append(geo_nullable)
+
+ if include_repeated:
+ # Repeated GEOGRAPHY field
+ geo_repeated = bigquery.TableFieldSchema()
+ geo_repeated.name = 'path'
+ geo_repeated.type = 'GEOGRAPHY'
+ geo_repeated.mode = 'REPEATED'
+ table_schema.fields.append(geo_repeated)
+
+ table = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId=self.project,
+ datasetId=self.dataset_id,
+ tableId=table_name),
+ schema=table_schema)
+ request = bigquery.BigqueryTablesInsertRequest(
+ projectId=self.project, datasetId=self.dataset_id, table=table)
+ self.bigquery_client.client.tables.Insert(request)
+
+ # Wait for table to be available
+ _ = self.bigquery_client.get_table(
+ self.project, self.dataset_id, table_name)
+
+ @pytest.mark.it_postcommit
+ def test_geography_write_and_read_basic_geometries(self):
+ """Test writing and reading basic GEOGRAPHY geometries."""
+ table_name = 'geography_basic_geometries'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ # Test data with various WKT geometry types
+ input_data = [
+ {
+ 'id': 1,
+ 'location': 'POINT(30 10)',
+ 'optional_location': ('POINT(-122.4194 37.7749)') # San Francisco
+ },
+ {
+ 'id': 2,
+ 'location': 'LINESTRING(30 10, 10 30, 40 40)',
+ 'optional_location': None
+ },
+ {
+ 'id': 3,
+ 'location': ('POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))'),
+ 'optional_location': ('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
+ },
+ {
+ 'id': 4,
+ 'location': ('MULTIPOINT((10 40), (40 30), (20 20), (30 10))'),
+ 'optional_location': 'POINT(0 0)'
+ },
+ {
+ 'id': 5,
+ 'location': (
+ 'MULTILINESTRING((10 10, 20 20, 10 40), '
+ '(40 40, 30 30, 40 20, 30 10))'),
+ 'optional_location': None
+ }
+ ]
+
+ table_schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }]
+ }
+
+ # Write data to BigQuery
+ with TestPipeline(is_integration_test=True) as p:
+ _ = (
+ p
+ | 'CreateData' >> beam.Create(input_data)
+ | 'WriteToBQ' >> WriteToBigQuery(
+ table=table_id,
+ schema=table_schema,
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ project=self.project))
+
+ # Read data back and verify
+ with TestPipeline(is_integration_test=True) as p:
+ result = (
+ p
+ | 'ReadFromBQ' >> ReadFromBigQuery(
+ table=table_id,
+ project=self.project,
+ method=ReadFromBigQuery.Method.DIRECT_READ)
+ | 'ExtractGeography' >> beam.Map(
+ lambda row:
+ (row['id'], row['location'], row['optional_location'])))
+
+ expected_data = [
+ (1, 'POINT(30 10)', 'POINT(-122.4194 37.7749)'),
+ (2, 'LINESTRING(30 10, 10 30, 40 40)', None),
+ (
+ 3,
+ 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))',
+ 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'),
+ (4, 'MULTIPOINT(20 20, 10 40, 40 30, 30 10)', 'POINT(0 0)'),
+ (
+ 5,
+ 'MULTILINESTRING((10 10, 20 20, 10 40), '
+ '(40 40, 30 30, 40 20, 30 10))',
+ None)
+ ]
+
+ assert_that(result, equal_to(expected_data))
+
+ @pytest.mark.it_postcommit
+ def test_geography_write_with_beam_rows(self):
+ """Test writing GEOGRAPHY data using Beam Rows with GeographyType."""
+ table_name = 'geography_beam_rows'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ # Create the table first
+ self.create_geography_table(table_name)
+
+ # Create Beam Rows with GeographyType
+ row_elements = [
+ beam.Row(id=1, location='POINT(1 1)', optional_location='POINT(2 2)'),
+ beam.Row(
+ id=2, location='LINESTRING(0 0, 1 1, 2 2)',
optional_location=None),
+ beam.Row(
+ id=3,
+ location='POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
+ optional_location='POINT(3 3)')
+ ]
+
+ # Expected data for verification
+ expected_data = [(1, 'POINT(1 1)', 'POINT(2 2)'),
+ (2, 'LINESTRING(0 0, 1 1, 2 2)', None),
+ (3, 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'POINT(3 3)')]
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query=(
+ "SELECT id, location, optional_location FROM %s ORDER BY id" %
+ table_id),
+ data=expected_data)
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args()
+
+ with beam.Pipeline(argv=args) as p:
+ _ = (
+ p
+ | 'CreateRows' >> beam.Create(row_elements)
+ | 'ConvertToDict' >> beam.Map(
+ lambda row: {
+ 'id': row.id, 'location': row.location,
+ 'optional_location': row.optional_location
+ })
+ | 'WriteToBQ' >> WriteToBigQuery(
+ table=table_id,
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ schema={
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ },
+ {
+ "name": "location",
+ "type": "GEOGRAPHY",
+ "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }]
+ }))
+
+ # Wait a bit for streaming inserts to complete
+ time.sleep(5)
+
+ # Verify the data was written correctly
+ hc.assert_that(None, hc.all_of(*pipeline_verifiers))
+
+ @pytest.mark.it_postcommit
+ def test_geography_repeated_fields(self):
+ """Test GEOGRAPHY fields with REPEATED mode."""
+ table_name = 'geography_repeated'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [
+ {
+ 'id': 1,
+ 'location': 'POINT(0 0)',
+ 'optional_location': 'POINT(1 1)',
+ 'path': ['POINT(0 0)', 'POINT(1 1)', 'POINT(2 2)']
+ },
+ {
+ 'id': 2,
+ 'location': 'POINT(10 10)',
+ 'optional_location': None,
+ 'path': ['LINESTRING(0 0, 5 5)', 'LINESTRING(5 5, 10 10)']
+ },
+ {
+ 'id': 3,
+ 'location': 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
+ 'optional_location': 'POINT(0.5 0.5)',
+ 'path': [] # Empty array
+ }
+ ]
+
+ table_schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }, {
+ "name": "path", "type": "GEOGRAPHY", "mode": "REPEATED"
+ }]
+ }
+
+ # Write data
+ args = self.test_pipeline.get_full_options_as_args()
+ with beam.Pipeline(argv=args) as p:
+ _ = (
+ p
+ | 'CreateData' >> beam.Create(input_data)
+ | 'WriteToBQ' >> WriteToBigQuery(
+ table=table_id,
+ schema=table_schema,
+ method=WriteToBigQuery.Method.STREAMING_INSERTS))
+
+ # Read and verify
+ with beam.Pipeline(argv=args) as p:
+ result = (
+ p
+ | 'ReadFromBQ' >> ReadFromBigQuery(
+ table=table_id,
+ method=ReadFromBigQuery.Method.DIRECT_READ,
+ project=self.project)
+ | 'ExtractData' >> beam.Map(
+ lambda row: (row['id'], len(row['path']) if row['path'] else 0)))
+
+ expected_counts = [(1, 3), (2, 2), (3, 0)]
+ assert_that(result, equal_to(expected_counts))
+
+ @pytest.mark.it_postcommit
+ def test_geography_complex_geometries(self):
+ """Test complex GEOGRAPHY geometries and edge cases."""
+ table_name = 'geography_complex'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ # Complex geometries including collections and high precision coordinates
+ input_data = [
+ {
+ 'id': 1,
+ 'location': (
+ 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))'),
+ 'optional_location': None
+ },
+ {
+ 'id': 2,
+ 'location': (
+ 'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), '
+ '((2 2, 3 2, 3 3, 2 3, 2 2)))'), # Fixed orientation
+ 'optional_location': ('POINT(-122.419416 37.774929)'
+ ) # High precision
+ },
+ {
+ 'id': 3,
+ 'location': ('POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))'
+ ), # Simple polygon without holes
+ 'optional_location': ('LINESTRING(-122 37, -121 38)'
+ ) # Fixed non-antipodal coordinates
+ }
+ ]
+
+ table_schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }]
+ }
+
+ expected_data = [(1, 'LINESTRING(4 6, 7 10)', None),
+ (
+ 2,
+ 'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), '
+ '((2 2, 3 2, 3 3, 2 3, 2 2)))',
+ 'POINT(-122.419416 37.774929)'),
+ (
+ 3,
+ 'POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))',
+ 'LINESTRING(-122 37, -121 38)')]
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query=(
+ "SELECT id, location, optional_location FROM %s ORDER BY id" %
+ table_id),
+ data=expected_data)
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args()
+
+ with beam.Pipeline(argv=args) as p:
+ _ = (
+ p
+ | 'CreateData' >> beam.Create(input_data)
+ | 'WriteToBQ' >> WriteToBigQuery(
+ table=table_id,
+ schema=table_schema,
+ method=WriteToBigQuery.Method.STREAMING_INSERTS))
+
+ hc.assert_that(p, hc.all_of(*pipeline_verifiers))
+
+ @pytest.mark.uses_gcp_java_expansion_service
+ @pytest.mark.it_postcommit
+ def test_geography_storage_write_api(self):
+ """Test GEOGRAPHY with Storage Write API method."""
+ table_name = 'geography_storage_write'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [{
+ 'id': 1, 'location': 'POINT(0 0)', 'optional_location': 'POINT(1 1)'
+ },
+ {
+ 'id': 2,
+ 'location': 'LINESTRING(0 0, 1 1)',
+ 'optional_location': None
+ }]
+
+ table_schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }]
+ }
+
+ expected_data = [(1, 'POINT(0 0)', 'POINT(1 1)'),
+ (2, 'LINESTRING(0 0, 1 1)', None)]
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query=(
+ "SELECT id, location, optional_location FROM %s ORDER BY id" %
+ table_id),
+ data=expected_data)
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args()
+
+ with beam.Pipeline(argv=args) as p:
+ _ = (
+ p
+ | 'CreateData' >> beam.Create(input_data)
+ | 'WriteToBQ' >> WriteToBigQuery(
+ table=table_id,
+ schema=table_schema,
+ method=WriteToBigQuery.Method.STORAGE_WRITE_API))
+
+ hc.assert_that(p, hc.all_of(*pipeline_verifiers))
+
+ @pytest.mark.it_postcommit
+ def test_geography_file_loads_method(self):
+ """Test GEOGRAPHY with FILE_LOADS method."""
+ table_name = 'geography_file_loads'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [
+ {
+ 'id': i,
+ 'location': f'POINT({i} {i})',
+ 'optional_location': (
+ f'POINT({i+10} {i+10})' if i % 2 == 0 else None)
+ } for i in range(1, 11) # 10 records
+ ]
+
+ table_schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }]
+ }
+
+ # Verify count and some sample data
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT COUNT(*) as count FROM %s" % table_id,
+ data=[(10, )])
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args()
+ gcs_temp_location = (
+ f'gs://temp-storage-for-end-to-end-tests/'
+ f'bq_it_test_{int(time.time())}')
+
+ with beam.Pipeline(argv=args) as p:
+ _ = (
+ p
+ | 'CreateData' >> beam.Create(input_data)
+ | 'WriteToBQ' >> WriteToBigQuery(
+ table=table_id,
+ schema=table_schema,
+ method=WriteToBigQuery.Method.FILE_LOADS,
+ custom_gcs_temp_location=gcs_temp_location))
+
+ hc.assert_that(p, hc.all_of(*pipeline_verifiers))
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
index beb373a7dea..54c7ca90f01 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
@@ -47,7 +47,8 @@ BIG_QUERY_TO_PYTHON_TYPES = {
"FLOAT": np.float64,
"BOOLEAN": bool,
"BYTES": bytes,
- "TIMESTAMP": apache_beam.utils.timestamp.Timestamp
+ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
+ "GEOGRAPHY": str,
#TODO(https://github.com/apache/beam/issues/20810):
# Finish mappings for all BQ types
}
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
index 7ae49dff205..0eb3351ee84 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
@@ -21,6 +21,7 @@ import unittest.mock
import mock
import numpy as np
+import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.io.gcp import bigquery_schema_tools
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -209,6 +210,133 @@ class TestBigQueryToSchema(unittest.TestCase):
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
- if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
+ def test_geography_type_support(self):
+ """Test that GEOGRAPHY type is properly supported in schema conversion."""
+ fields = [
+ bigquery.TableFieldSchema(
+ name='location', type='GEOGRAPHY', mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name='locations', type='GEOGRAPHY', mode="REPEATED"),
+ bigquery.TableFieldSchema(
+ name='required_location', type='GEOGRAPHY', mode="REQUIRED")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
+ the_table_schema=schema)
+
+ expected_annotations = {
+ 'location': typing.Optional[str],
+ 'locations': typing.Sequence[str],
+ 'required_location': str
+ }
+
+ self.assertEqual(usertype.__annotations__, expected_annotations)
+
+ def test_geography_in_bq_to_python_types_mapping(self):
+ """Test that GEOGRAPHY is included in BIG_QUERY_TO_PYTHON_TYPES mapping."""
+ from apache_beam.io.gcp.bigquery_schema_tools import
BIG_QUERY_TO_PYTHON_TYPES
+
+ self.assertIn("GEOGRAPHY", BIG_QUERY_TO_PYTHON_TYPES)
+ self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["GEOGRAPHY"], str)
+
+ def test_geography_field_type_conversion(self):
+ """Test bq_field_to_type function with GEOGRAPHY fields."""
+ from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type
+
+ # Test required GEOGRAPHY field
+ result = bq_field_to_type("GEOGRAPHY", "REQUIRED")
+ self.assertEqual(result, str)
+
+ # Test nullable GEOGRAPHY field
+ result = bq_field_to_type("GEOGRAPHY", "NULLABLE")
+ self.assertEqual(result, typing.Optional[str])
+
+ # Test repeated GEOGRAPHY field
+ result = bq_field_to_type("GEOGRAPHY", "REPEATED")
+ self.assertEqual(result, typing.Sequence[str])
+
+ # Test GEOGRAPHY field with None mode (should default to nullable)
+ result = bq_field_to_type("GEOGRAPHY", None)
+ self.assertEqual(result, typing.Optional[str])
+
+ # Test GEOGRAPHY field with empty mode (should default to nullable)
+ result = bq_field_to_type("GEOGRAPHY", "")
+ self.assertEqual(result, typing.Optional[str])
+
+ def test_convert_to_usertype_with_geography(self):
+ """Test convert_to_usertype function with GEOGRAPHY fields."""
+ schema = bigquery.TableSchema(
+ fields=[
+ bigquery.TableFieldSchema(
+ name='id', type='INTEGER', mode="REQUIRED"),
+ bigquery.TableFieldSchema(
+ name='location', type='GEOGRAPHY', mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name='name', type='STRING', mode="REQUIRED")
+ ])
+
+ conversion_transform = bigquery_schema_tools.convert_to_usertype(schema)
+
+ # Verify the transform is created successfully
+ self.assertIsNotNone(conversion_transform)
+
+ # The transform should be a ParDo with BeamSchemaConversionDoFn
+ self.assertIsInstance(conversion_transform, beam.ParDo)
+
+ def test_beam_schema_conversion_dofn_with_geography(self):
+ """Test BeamSchemaConversionDoFn with GEOGRAPHY data."""
+ from apache_beam.io.gcp.bigquery_schema_tools import
BeamSchemaConversionDoFn
+
+ # Create a user type with GEOGRAPHY field
+ fields = [
+ bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"),
+ bigquery.TableFieldSchema(
+ name='location', type='GEOGRAPHY', mode="NULLABLE")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
+
+ # Create the DoFn
+ dofn = BeamSchemaConversionDoFn(usertype)
+
+ # Test processing a dictionary with GEOGRAPHY data
+ input_dict = {'id': 1, 'location': 'POINT(30 10)'}
+
+ results = list(dofn.process(input_dict))
+ self.assertEqual(len(results), 1)
+
+ result = results[0]
+ self.assertEqual(result.id, 1)
+ self.assertEqual(result.location, 'POINT(30 10)')
+
+ def test_geography_with_complex_wkt(self):
+ """Test GEOGRAPHY type with complex Well-Known Text geometries."""
+ fields = [
+ bigquery.TableFieldSchema(
+ name='simple_point', type='GEOGRAPHY', mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name='linestring', type='GEOGRAPHY', mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name='polygon', type='GEOGRAPHY', mode="NULLABLE"),
+ bigquery.TableFieldSchema(
+ name='multigeometry', type='GEOGRAPHY', mode="NULLABLE")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
+
+ # All GEOGRAPHY fields should map to Optional[str]
+ expected_annotations = {
+ 'simple_point': typing.Optional[str],
+ 'linestring': typing.Optional[str],
+ 'polygon': typing.Optional[str],
+ 'multigeometry': typing.Optional[str]
+ }
+
+ self.assertEqual(usertype.__annotations__, expected_annotations)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index d2fa7627a80..36a1015e3d2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -121,6 +121,7 @@ BIGQUERY_TYPE_TO_PYTHON_TYPE = {
"FLOAT": np.float64,
"NUMERIC": decimal.Decimal,
"TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
+ "GEOGRAPHY": str,
}
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 1101317439a..066fc898554 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -1092,6 +1092,160 @@ class TestBeamTypehintFromSchema(unittest.TestCase):
self.assertEqual(typehints, expected_typehints)
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestGeographyTypeSupport(unittest.TestCase):
+ """Tests for GEOGRAPHY data type support in BigQuery."""
+ def test_geography_in_bigquery_type_mapping(self):
+ """Test that GEOGRAPHY is properly mapped in type mapping."""
+ from apache_beam.io.gcp.bigquery_tools import BIGQUERY_TYPE_TO_PYTHON_TYPE
+
+ self.assertIn("GEOGRAPHY", BIGQUERY_TYPE_TO_PYTHON_TYPE)
+ self.assertEqual(BIGQUERY_TYPE_TO_PYTHON_TYPE["GEOGRAPHY"], str)
+
+ def test_geography_field_conversion(self):
+ """Test that GEOGRAPHY fields are converted correctly."""
+ from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+
+ # Create a mock field with GEOGRAPHY type
+ field = bigquery.TableFieldSchema()
+ field.type = 'GEOGRAPHY'
+ field.name = 'location'
+ field.mode = 'NULLABLE'
+
+ wrapper = BigQueryWrapper()
+
+ # Test various WKT formats
+ test_cases = [
+ "POINT(30 10)",
+ "LINESTRING(30 10, 10 30, 40 40)",
+ "POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))",
+ "MULTIPOINT((10 40), (40 30), (20 20), (30 10))",
+ "GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))"
+ ]
+
+ for wkt_value in test_cases:
+ result = wrapper._convert_cell_value_to_dict(wkt_value, field)
+ self.assertEqual(result, wkt_value)
+ self.assertIsInstance(result, str)
+
+ def test_geography_typehints_from_schema(self):
+ """Test that GEOGRAPHY fields generate correct type hints."""
+ schema = {
+ "fields": [{
+ "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+ },
+ {
+ "name": "optional_location",
+ "type": "GEOGRAPHY",
+ "mode": "NULLABLE"
+ }, {
+ "name": "locations",
+ "type": "GEOGRAPHY",
+ "mode": "REPEATED"
+ }]
+ }
+
+ typehints = get_beam_typehints_from_tableschema(schema)
+
+ expected_typehints = [("location", str),
+ ("optional_location", Optional[str]),
+ ("locations", Sequence[str])]
+
+ self.assertEqual(typehints, expected_typehints)
+
+ def test_geography_beam_row_conversion(self):
+ """Test converting dictionary with GEOGRAPHY to Beam Row."""
+ schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE"
+ }, {
+ "name": "name", "type": "STRING", "mode": "REQUIRED"
+ }]
+ }
+
+ row_dict = {"id": 1, "location": "POINT(30 10)", "name": "Test Location"}
+
+ beam_row = beam_row_from_dict(row_dict, schema)
+
+ self.assertEqual(beam_row.id, 1)
+ self.assertEqual(beam_row.location, "POINT(30 10)")
+ self.assertEqual(beam_row.name, "Test Location")
+
+ def test_geography_beam_row_conversion_with_null(self):
+ """Test converting dictionary with null GEOGRAPHY to Beam Row."""
+ schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE"
+ }]
+ }
+
+ row_dict = {"id": 1, "location": None}
+
+ beam_row = beam_row_from_dict(row_dict, schema)
+
+ self.assertEqual(beam_row.id, 1)
+ self.assertIsNone(beam_row.location)
+
+ def test_geography_beam_row_conversion_repeated(self):
+ """Test converting dictionary with repeated GEOGRAPHY to Beam Row."""
+ schema = {
+ "fields": [{
+ "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+ }, {
+ "name": "locations", "type": "GEOGRAPHY", "mode": "REPEATED"
+ }]
+ }
+
+ row_dict = {
+ "id": 1,
+ "locations": ["POINT(30 10)", "POINT(40 20)", "LINESTRING(0 0, 1 1)"]
+ }
+
+ beam_row = beam_row_from_dict(row_dict, schema)
+
+ self.assertEqual(beam_row.id, 1)
+ self.assertEqual(len(beam_row.locations), 3)
+ self.assertEqual(beam_row.locations[0], "POINT(30 10)")
+ self.assertEqual(beam_row.locations[1], "POINT(40 20)")
+ self.assertEqual(beam_row.locations[2], "LINESTRING(0 0, 1 1)")
+
+ def test_geography_json_encoding(self):
+ """Test that GEOGRAPHY values are properly JSON encoded."""
+ coder = RowAsDictJsonCoder()
+
+ row_with_geography = {"id": 1, "location": "POINT(30 10)", "name": "Test"}
+
+ encoded = coder.encode(row_with_geography)
+ decoded = coder.decode(encoded)
+
+ self.assertEqual(decoded["location"], "POINT(30 10)")
+ self.assertIsInstance(decoded["location"], str)
+
+ def test_geography_with_special_characters(self):
+ """Test GEOGRAPHY values with special characters and geometries."""
+ from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+
+ field = bigquery.TableFieldSchema()
+ field.type = 'GEOGRAPHY'
+ field.name = 'complex_geo'
+ field.mode = 'NULLABLE'
+
+ wrapper = BigQueryWrapper()
+
+ # Test complex WKT with various coordinate systems and precision
+ complex_wkt = (
+ "POLYGON((-122.4194 37.7749, -122.4094 37.7849, "
+ "-122.3994 37.7749, -122.4194 37.7749))")
+
+ result = wrapper._convert_cell_value_to_dict(complex_wkt, field)
+ self.assertEqual(result, complex_wkt)
+ self.assertIsInstance(result, str)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()