This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 30fd958f5fc feat(bigquery): add GEOGRAPHY type support for BigQuery 
I/O (#36121)
30fd958f5fc is described below

commit 30fd958f5fc92ef2a7e069763f43077e32c63873
Author: liferoad <[email protected]>
AuthorDate: Wed Oct 15 14:47:47 2025 -0400

    feat(bigquery): add GEOGRAPHY type support for BigQuery I/O (#36121)
    
    * feat(bigquery): add GEOGRAPHY type support for BigQuery I/O
    
    Add support for BigQuery GEOGRAPHY type which works with Well-Known Text 
(WKT) format. The change includes:
    - Adding GEOGRAPHY to type mappings in bigquery_tools and 
bigquery_schema_tools
    - Implementing GeographyType logical type in schemas.py
    - Adding comprehensive tests for GEOGRAPHY type conversion and schema 
integration
    
    * fixed tests
    
    * tests
    
    * fixed tests
    
    * fixes language_type
    
    * fixed logical type
    
    * urns
    
    * add BQ IT
    
    * yapf
    
    * feat(bigquery): add project handling and test improvements
    
    - Add _get_project method to handle project billing in BigQuery source
    - Update tests to explicitly specify project parameter
    - Fix geography test data formats and simplify test cases
    - Add temporary storage location for file load tests
    
    * lint
    
    * format
    
    * removed GeographyType for now
    
    * restore schemas.py
    
    * added uses_gcp_java_expansion_service
---
 sdks/python/apache_beam/io/gcp/bigquery.py         |  13 +
 .../io/gcp/bigquery_geography_it_test.py           | 540 +++++++++++++++++++++
 .../apache_beam/io/gcp/bigquery_schema_tools.py    |   3 +-
 .../io/gcp/bigquery_schema_tools_test.py           | 134 ++++-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |   1 +
 .../apache_beam/io/gcp/bigquery_tools_test.py      | 154 ++++++
 6 files changed, 841 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 0905ba764de..7d5dd876bda 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1029,6 +1029,16 @@ class _CustomBigQueryStorageSource(BoundedSource):
     self._step_name = step_name
     self._source_uuid = unique_id
 
+  def _get_project(self):
+    """Returns the project that queries and exports will be billed to."""
+    if self.pipeline_options:
+      project = self.pipeline_options.view_as(GoogleCloudOptions).project
+      if isinstance(project, vp.ValueProvider):
+        project = project.get()
+      if project:
+        return project
+    return self.project
+
   def _get_parent_project(self):
     """Returns the project that will be billed."""
     if self.temp_table:
@@ -1164,6 +1174,9 @@ class _CustomBigQueryStorageSource(BoundedSource):
         self._setup_temporary_dataset(bq)
         self.table_reference = self._execute_query(bq)
 
+      if not self.table_reference.projectId:
+        self.table_reference.projectId = self._get_project()
+
       requested_session = bq_storage.types.ReadSession()
       requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format(
           self.table_reference.projectId,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py
new file mode 100644
index 00000000000..5a506d3162f
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py
@@ -0,0 +1,540 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Integration tests for BigQuery GEOGRAPHY data type support."""
+
+import logging
+import secrets
+import time
+import unittest
+
+import hamcrest as hc
+import pytest
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery import ReadFromBigQuery
+from apache_beam.io.gcp.bigquery import WriteToBigQuery
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+
+_LOGGER = logging.getLogger(__name__)
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class BigQueryGeographyIntegrationTests(unittest.TestCase):
+  """Integration tests for BigQuery GEOGRAPHY data type."""
+
+  BIG_QUERY_DATASET_ID = 'python_geography_it_test_'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+
+    self.bigquery_client = BigQueryWrapper()
+    self.dataset_id = '%s%d%s' % (
+        self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3))
+    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+    _LOGGER.info(
+        "Created dataset %s in project %s", self.dataset_id, self.project)
+
+  def tearDown(self):
+    request = bigquery.BigqueryDatasetsDeleteRequest(
+        projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
+    try:
+      _LOGGER.info(
+          "Deleting dataset %s in project %s", self.dataset_id, self.project)
+      self.bigquery_client.client.datasets.Delete(request)
+    except HttpError:
+      _LOGGER.debug(
+          'Failed to clean up dataset %s in project %s',
+          self.dataset_id,
+          self.project)
+
+  def create_geography_table(self, table_name, include_repeated=False):
+    """Create a table with various GEOGRAPHY field configurations."""
+    table_schema = bigquery.TableSchema()
+
+    # ID field
+    id_field = bigquery.TableFieldSchema()
+    id_field.name = 'id'
+    id_field.type = 'INTEGER'
+    id_field.mode = 'REQUIRED'
+    table_schema.fields.append(id_field)
+
+    # Required GEOGRAPHY field
+    geo_required = bigquery.TableFieldSchema()
+    geo_required.name = 'location'
+    geo_required.type = 'GEOGRAPHY'
+    geo_required.mode = 'REQUIRED'
+    table_schema.fields.append(geo_required)
+
+    # Nullable GEOGRAPHY field
+    geo_nullable = bigquery.TableFieldSchema()
+    geo_nullable.name = 'optional_location'
+    geo_nullable.type = 'GEOGRAPHY'
+    geo_nullable.mode = 'NULLABLE'
+    table_schema.fields.append(geo_nullable)
+
+    if include_repeated:
+      # Repeated GEOGRAPHY field
+      geo_repeated = bigquery.TableFieldSchema()
+      geo_repeated.name = 'path'
+      geo_repeated.type = 'GEOGRAPHY'
+      geo_repeated.mode = 'REPEATED'
+      table_schema.fields.append(geo_repeated)
+
+    table = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId=self.project,
+            datasetId=self.dataset_id,
+            tableId=table_name),
+        schema=table_schema)
+    request = bigquery.BigqueryTablesInsertRequest(
+        projectId=self.project, datasetId=self.dataset_id, table=table)
+    self.bigquery_client.client.tables.Insert(request)
+
+    # Wait for table to be available
+    _ = self.bigquery_client.get_table(
+        self.project, self.dataset_id, table_name)
+
+  @pytest.mark.it_postcommit
+  def test_geography_write_and_read_basic_geometries(self):
+    """Test writing and reading basic GEOGRAPHY geometries."""
+    table_name = 'geography_basic_geometries'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    # Test data with various WKT geometry types
+    input_data = [
+        {
+            'id': 1,
+            'location': 'POINT(30 10)',
+            'optional_location': ('POINT(-122.4194 37.7749)')  # San Francisco
+        },
+        {
+            'id': 2,
+            'location': 'LINESTRING(30 10, 10 30, 40 40)',
+            'optional_location': None
+        },
+        {
+            'id': 3,
+            'location': ('POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))'),
+            'optional_location': ('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
+        },
+        {
+            'id': 4,
+            'location': ('MULTIPOINT((10 40), (40 30), (20 20), (30 10))'),
+            'optional_location': 'POINT(0 0)'
+        },
+        {
+            'id': 5,
+            'location': (
+                'MULTILINESTRING((10 10, 20 20, 10 40), '
+                '(40 40, 30 30, 40 20, 30 10))'),
+            'optional_location': None
+        }
+    ]
+
+    table_schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "optional_location",
+                       "type": "GEOGRAPHY",
+                       "mode": "NULLABLE"
+                   }]
+    }
+
+    # Write data to BigQuery
+    with TestPipeline(is_integration_test=True) as p:
+      _ = (
+          p
+          | 'CreateData' >> beam.Create(input_data)
+          | 'WriteToBQ' >> WriteToBigQuery(
+              table=table_id,
+              schema=table_schema,
+              method=WriteToBigQuery.Method.STREAMING_INSERTS,
+              project=self.project))
+
+    # Read data back and verify
+    with TestPipeline(is_integration_test=True) as p:
+      result = (
+          p
+          | 'ReadFromBQ' >> ReadFromBigQuery(
+              table=table_id,
+              project=self.project,
+              method=ReadFromBigQuery.Method.DIRECT_READ)
+          | 'ExtractGeography' >> beam.Map(
+              lambda row:
+              (row['id'], row['location'], row['optional_location'])))
+
+      expected_data = [
+          (1, 'POINT(30 10)', 'POINT(-122.4194 37.7749)'),
+          (2, 'LINESTRING(30 10, 10 30, 40 40)', None),
+          (
+              3,
+              'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))',
+              'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'),
+          (4, 'MULTIPOINT(20 20, 10 40, 40 30, 30 10)', 'POINT(0 0)'),
+          (
+              5,
+              'MULTILINESTRING((10 10, 20 20, 10 40), '
+              '(40 40, 30 30, 40 20, 30 10))',
+              None)
+      ]
+
+      assert_that(result, equal_to(expected_data))
+
+  @pytest.mark.it_postcommit
+  def test_geography_write_with_beam_rows(self):
+    """Test writing GEOGRAPHY data using Beam Rows with GeographyType."""
+    table_name = 'geography_beam_rows'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    # Create the table first
+    self.create_geography_table(table_name)
+
+    # Create Beam Rows with GeographyType
+    row_elements = [
+        beam.Row(id=1, location='POINT(1 1)', optional_location='POINT(2 2)'),
+        beam.Row(
+            id=2, location='LINESTRING(0 0, 1 1, 2 2)', 
optional_location=None),
+        beam.Row(
+            id=3,
+            location='POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
+            optional_location='POINT(3 3)')
+    ]
+
+    # Expected data for verification
+    expected_data = [(1, 'POINT(1 1)', 'POINT(2 2)'),
+                     (2, 'LINESTRING(0 0, 1 1, 2 2)', None),
+                     (3, 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'POINT(3 3)')]
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query=(
+                "SELECT id, location, optional_location FROM %s ORDER BY id" %
+                table_id),
+            data=expected_data)
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with beam.Pipeline(argv=args) as p:
+      _ = (
+          p
+          | 'CreateRows' >> beam.Create(row_elements)
+          | 'ConvertToDict' >> beam.Map(
+              lambda row: {
+                  'id': row.id, 'location': row.location,
+                  'optional_location': row.optional_location
+              })
+          | 'WriteToBQ' >> WriteToBigQuery(
+              table=table_id,
+              method=WriteToBigQuery.Method.STREAMING_INSERTS,
+              schema={
+                  "fields": [{
+                      "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+                  },
+                             {
+                                 "name": "location",
+                                 "type": "GEOGRAPHY",
+                                 "mode": "REQUIRED"
+                             },
+                             {
+                                 "name": "optional_location",
+                                 "type": "GEOGRAPHY",
+                                 "mode": "NULLABLE"
+                             }]
+              }))
+
+    # Wait a bit for streaming inserts to complete
+    time.sleep(5)
+
+    # Verify the data was written correctly
+    hc.assert_that(None, hc.all_of(*pipeline_verifiers))
+
+  @pytest.mark.it_postcommit
+  def test_geography_repeated_fields(self):
+    """Test GEOGRAPHY fields with REPEATED mode."""
+    table_name = 'geography_repeated'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [
+        {
+            'id': 1,
+            'location': 'POINT(0 0)',
+            'optional_location': 'POINT(1 1)',
+            'path': ['POINT(0 0)', 'POINT(1 1)', 'POINT(2 2)']
+        },
+        {
+            'id': 2,
+            'location': 'POINT(10 10)',
+            'optional_location': None,
+            'path': ['LINESTRING(0 0, 5 5)', 'LINESTRING(5 5, 10 10)']
+        },
+        {
+            'id': 3,
+            'location': 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
+            'optional_location': 'POINT(0.5 0.5)',
+            'path': []  # Empty array
+        }
+    ]
+
+    table_schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "optional_location",
+                       "type": "GEOGRAPHY",
+                       "mode": "NULLABLE"
+                   }, {
+                       "name": "path", "type": "GEOGRAPHY", "mode": "REPEATED"
+                   }]
+    }
+
+    # Write data
+    args = self.test_pipeline.get_full_options_as_args()
+    with beam.Pipeline(argv=args) as p:
+      _ = (
+          p
+          | 'CreateData' >> beam.Create(input_data)
+          | 'WriteToBQ' >> WriteToBigQuery(
+              table=table_id,
+              schema=table_schema,
+              method=WriteToBigQuery.Method.STREAMING_INSERTS))
+
+    # Read and verify
+    with beam.Pipeline(argv=args) as p:
+      result = (
+          p
+          | 'ReadFromBQ' >> ReadFromBigQuery(
+              table=table_id,
+              method=ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.project)
+          | 'ExtractData' >> beam.Map(
+              lambda row: (row['id'], len(row['path']) if row['path'] else 0)))
+
+      expected_counts = [(1, 3), (2, 2), (3, 0)]
+      assert_that(result, equal_to(expected_counts))
+
+  @pytest.mark.it_postcommit
+  def test_geography_complex_geometries(self):
+    """Test complex GEOGRAPHY geometries and edge cases."""
+    table_name = 'geography_complex'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    # Complex geometries including collections and high precision coordinates
+    input_data = [
+        {
+            'id': 1,
+            'location': (
+                'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))'),
+            'optional_location': None
+        },
+        {
+            'id': 2,
+            'location': (
+                'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), '
+                '((2 2, 3 2, 3 3, 2 3, 2 2)))'),  # Fixed orientation
+            'optional_location': ('POINT(-122.419416 37.774929)'
+                                  )  # High precision
+        },
+        {
+            'id': 3,
+            'location': ('POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))'
+                         ),  # Simple polygon without holes
+            'optional_location': ('LINESTRING(-122 37, -121 38)'
+                                  )  # Fixed non-antipodal coordinates
+        }
+    ]
+
+    table_schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "optional_location",
+                       "type": "GEOGRAPHY",
+                       "mode": "NULLABLE"
+                   }]
+    }
+
+    expected_data = [(1, 'LINESTRING(4 6, 7 10)', None),
+                     (
+                         2,
+                         'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), '
+                         '((2 2, 3 2, 3 3, 2 3, 2 2)))',
+                         'POINT(-122.419416 37.774929)'),
+                     (
+                         3,
+                         'POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))',
+                         'LINESTRING(-122 37, -121 38)')]
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query=(
+                "SELECT id, location, optional_location FROM %s ORDER BY id" %
+                table_id),
+            data=expected_data)
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with beam.Pipeline(argv=args) as p:
+      _ = (
+          p
+          | 'CreateData' >> beam.Create(input_data)
+          | 'WriteToBQ' >> WriteToBigQuery(
+              table=table_id,
+              schema=table_schema,
+              method=WriteToBigQuery.Method.STREAMING_INSERTS))
+
+    hc.assert_that(p, hc.all_of(*pipeline_verifiers))
+
+  @pytest.mark.uses_gcp_java_expansion_service
+  @pytest.mark.it_postcommit
+  def test_geography_storage_write_api(self):
+    """Test GEOGRAPHY with Storage Write API method."""
+    table_name = 'geography_storage_write'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [{
+        'id': 1, 'location': 'POINT(0 0)', 'optional_location': 'POINT(1 1)'
+    },
+                  {
+                      'id': 2,
+                      'location': 'LINESTRING(0 0, 1 1)',
+                      'optional_location': None
+                  }]
+
+    table_schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "optional_location",
+                       "type": "GEOGRAPHY",
+                       "mode": "NULLABLE"
+                   }]
+    }
+
+    expected_data = [(1, 'POINT(0 0)', 'POINT(1 1)'),
+                     (2, 'LINESTRING(0 0, 1 1)', None)]
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query=(
+                "SELECT id, location, optional_location FROM %s ORDER BY id" %
+                table_id),
+            data=expected_data)
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with beam.Pipeline(argv=args) as p:
+      _ = (
+          p
+          | 'CreateData' >> beam.Create(input_data)
+          | 'WriteToBQ' >> WriteToBigQuery(
+              table=table_id,
+              schema=table_schema,
+              method=WriteToBigQuery.Method.STORAGE_WRITE_API))
+
+    hc.assert_that(p, hc.all_of(*pipeline_verifiers))
+
+  @pytest.mark.it_postcommit
+  def test_geography_file_loads_method(self):
+    """Test GEOGRAPHY with FILE_LOADS method."""
+    table_name = 'geography_file_loads'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [
+        {
+            'id': i,
+            'location': f'POINT({i} {i})',
+            'optional_location': (
+                f'POINT({i+10} {i+10})' if i % 2 == 0 else None)
+        } for i in range(1, 11)  # 10 records
+    ]
+
+    table_schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "optional_location",
+                       "type": "GEOGRAPHY",
+                       "mode": "NULLABLE"
+                   }]
+    }
+
+    # Verify count and some sample data
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT COUNT(*) as count FROM %s" % table_id,
+            data=[(10, )])
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args()
+    gcs_temp_location = (
+        f'gs://temp-storage-for-end-to-end-tests/'
+        f'bq_it_test_{int(time.time())}')
+
+    with beam.Pipeline(argv=args) as p:
+      _ = (
+          p
+          | 'CreateData' >> beam.Create(input_data)
+          | 'WriteToBQ' >> WriteToBigQuery(
+              table=table_id,
+              schema=table_schema,
+              method=WriteToBigQuery.Method.FILE_LOADS,
+              custom_gcs_temp_location=gcs_temp_location))
+
+    hc.assert_that(p, hc.all_of(*pipeline_verifiers))
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
index beb373a7dea..54c7ca90f01 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
@@ -47,7 +47,8 @@ BIG_QUERY_TO_PYTHON_TYPES = {
     "FLOAT": np.float64,
     "BOOLEAN": bool,
     "BYTES": bytes,
-    "TIMESTAMP": apache_beam.utils.timestamp.Timestamp
+    "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
+    "GEOGRAPHY": str,
     #TODO(https://github.com/apache/beam/issues/20810):
     # Finish mappings for all BQ types
 }
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
index 7ae49dff205..0eb3351ee84 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
@@ -21,6 +21,7 @@ import unittest.mock
 import mock
 import numpy as np
 
+import apache_beam as beam
 import apache_beam.io.gcp.bigquery
 from apache_beam.io.gcp import bigquery_schema_tools
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -209,6 +210,133 @@ class TestBigQueryToSchema(unittest.TestCase):
           query='SELECT name FROM dataset.sample_table',
           output_type='BEAM_ROW')
 
-  if __name__ == '__main__':
-    logging.getLogger().setLevel(logging.INFO)
-    unittest.main()
+  def test_geography_type_support(self):
+    """Test that GEOGRAPHY type is properly supported in schema conversion."""
+    fields = [
+        bigquery.TableFieldSchema(
+            name='location', type='GEOGRAPHY', mode="NULLABLE"),
+        bigquery.TableFieldSchema(
+            name='locations', type='GEOGRAPHY', mode="REPEATED"),
+        bigquery.TableFieldSchema(
+            name='required_location', type='GEOGRAPHY', mode="REQUIRED")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
+        the_table_schema=schema)
+
+    expected_annotations = {
+        'location': typing.Optional[str],
+        'locations': typing.Sequence[str],
+        'required_location': str
+    }
+
+    self.assertEqual(usertype.__annotations__, expected_annotations)
+
+  def test_geography_in_bq_to_python_types_mapping(self):
+    """Test that GEOGRAPHY is included in BIG_QUERY_TO_PYTHON_TYPES mapping."""
+    from apache_beam.io.gcp.bigquery_schema_tools import 
BIG_QUERY_TO_PYTHON_TYPES
+
+    self.assertIn("GEOGRAPHY", BIG_QUERY_TO_PYTHON_TYPES)
+    self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["GEOGRAPHY"], str)
+
+  def test_geography_field_type_conversion(self):
+    """Test bq_field_to_type function with GEOGRAPHY fields."""
+    from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type
+
+    # Test required GEOGRAPHY field
+    result = bq_field_to_type("GEOGRAPHY", "REQUIRED")
+    self.assertEqual(result, str)
+
+    # Test nullable GEOGRAPHY field
+    result = bq_field_to_type("GEOGRAPHY", "NULLABLE")
+    self.assertEqual(result, typing.Optional[str])
+
+    # Test repeated GEOGRAPHY field
+    result = bq_field_to_type("GEOGRAPHY", "REPEATED")
+    self.assertEqual(result, typing.Sequence[str])
+
+    # Test GEOGRAPHY field with None mode (should default to nullable)
+    result = bq_field_to_type("GEOGRAPHY", None)
+    self.assertEqual(result, typing.Optional[str])
+
+    # Test GEOGRAPHY field with empty mode (should default to nullable)
+    result = bq_field_to_type("GEOGRAPHY", "")
+    self.assertEqual(result, typing.Optional[str])
+
+  def test_convert_to_usertype_with_geography(self):
+    """Test convert_to_usertype function with GEOGRAPHY fields."""
+    schema = bigquery.TableSchema(
+        fields=[
+            bigquery.TableFieldSchema(
+                name='id', type='INTEGER', mode="REQUIRED"),
+            bigquery.TableFieldSchema(
+                name='location', type='GEOGRAPHY', mode="NULLABLE"),
+            bigquery.TableFieldSchema(
+                name='name', type='STRING', mode="REQUIRED")
+        ])
+
+    conversion_transform = bigquery_schema_tools.convert_to_usertype(schema)
+
+    # Verify the transform is created successfully
+    self.assertIsNotNone(conversion_transform)
+
+    # The transform should be a ParDo with BeamSchemaConversionDoFn
+    self.assertIsInstance(conversion_transform, beam.ParDo)
+
+  def test_beam_schema_conversion_dofn_with_geography(self):
+    """Test BeamSchemaConversionDoFn with GEOGRAPHY data."""
+    from apache_beam.io.gcp.bigquery_schema_tools import 
BeamSchemaConversionDoFn
+
+    # Create a user type with GEOGRAPHY field
+    fields = [
+        bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"),
+        bigquery.TableFieldSchema(
+            name='location', type='GEOGRAPHY', mode="NULLABLE")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+    usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
+
+    # Create the DoFn
+    dofn = BeamSchemaConversionDoFn(usertype)
+
+    # Test processing a dictionary with GEOGRAPHY data
+    input_dict = {'id': 1, 'location': 'POINT(30 10)'}
+
+    results = list(dofn.process(input_dict))
+    self.assertEqual(len(results), 1)
+
+    result = results[0]
+    self.assertEqual(result.id, 1)
+    self.assertEqual(result.location, 'POINT(30 10)')
+
+  def test_geography_with_complex_wkt(self):
+    """Test GEOGRAPHY type with complex Well-Known Text geometries."""
+    fields = [
+        bigquery.TableFieldSchema(
+            name='simple_point', type='GEOGRAPHY', mode="NULLABLE"),
+        bigquery.TableFieldSchema(
+            name='linestring', type='GEOGRAPHY', mode="NULLABLE"),
+        bigquery.TableFieldSchema(
+            name='polygon', type='GEOGRAPHY', mode="NULLABLE"),
+        bigquery.TableFieldSchema(
+            name='multigeometry', type='GEOGRAPHY', mode="NULLABLE")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
+
+    # All GEOGRAPHY fields should map to Optional[str]
+    expected_annotations = {
+        'simple_point': typing.Optional[str],
+        'linestring': typing.Optional[str],
+        'polygon': typing.Optional[str],
+        'multigeometry': typing.Optional[str]
+    }
+
+    self.assertEqual(usertype.__annotations__, expected_annotations)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index d2fa7627a80..36a1015e3d2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -121,6 +121,7 @@ BIGQUERY_TYPE_TO_PYTHON_TYPE = {
     "FLOAT": np.float64,
     "NUMERIC": decimal.Decimal,
     "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
+    "GEOGRAPHY": str,
 }
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 1101317439a..066fc898554 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -1092,6 +1092,160 @@ class TestBeamTypehintFromSchema(unittest.TestCase):
     self.assertEqual(typehints, expected_typehints)
 
 
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestGeographyTypeSupport(unittest.TestCase):
+  """Tests for GEOGRAPHY data type support in BigQuery."""
+  def test_geography_in_bigquery_type_mapping(self):
+    """Test that GEOGRAPHY is properly mapped in type mapping."""
+    from apache_beam.io.gcp.bigquery_tools import BIGQUERY_TYPE_TO_PYTHON_TYPE
+
+    self.assertIn("GEOGRAPHY", BIGQUERY_TYPE_TO_PYTHON_TYPE)
+    self.assertEqual(BIGQUERY_TYPE_TO_PYTHON_TYPE["GEOGRAPHY"], str)
+
+  def test_geography_field_conversion(self):
+    """Test that GEOGRAPHY fields are converted correctly."""
+    from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+
+    # Create a mock field with GEOGRAPHY type
+    field = bigquery.TableFieldSchema()
+    field.type = 'GEOGRAPHY'
+    field.name = 'location'
+    field.mode = 'NULLABLE'
+
+    wrapper = BigQueryWrapper()
+
+    # Test various WKT formats
+    test_cases = [
+        "POINT(30 10)",
+        "LINESTRING(30 10, 10 30, 40 40)",
+        "POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))",
+        "MULTIPOINT((10 40), (40 30), (20 20), (30 10))",
+        "GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))"
+    ]
+
+    for wkt_value in test_cases:
+      result = wrapper._convert_cell_value_to_dict(wkt_value, field)
+      self.assertEqual(result, wkt_value)
+      self.assertIsInstance(result, str)
+
+  def test_geography_typehints_from_schema(self):
+    """Test that GEOGRAPHY fields generate correct type hints."""
+    schema = {
+        "fields": [{
+            "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"
+        },
+                   {
+                       "name": "optional_location",
+                       "type": "GEOGRAPHY",
+                       "mode": "NULLABLE"
+                   }, {
+                       "name": "locations",
+                       "type": "GEOGRAPHY",
+                       "mode": "REPEATED"
+                   }]
+    }
+
+    typehints = get_beam_typehints_from_tableschema(schema)
+
+    expected_typehints = [("location", str),
+                          ("optional_location", Optional[str]),
+                          ("locations", Sequence[str])]
+
+    self.assertEqual(typehints, expected_typehints)
+
+  def test_geography_beam_row_conversion(self):
+    """Test converting dictionary with GEOGRAPHY to Beam Row."""
+    schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE"
+        }, {
+            "name": "name", "type": "STRING", "mode": "REQUIRED"
+        }]
+    }
+
+    row_dict = {"id": 1, "location": "POINT(30 10)", "name": "Test Location"}
+
+    beam_row = beam_row_from_dict(row_dict, schema)
+
+    self.assertEqual(beam_row.id, 1)
+    self.assertEqual(beam_row.location, "POINT(30 10)")
+    self.assertEqual(beam_row.name, "Test Location")
+
+  def test_geography_beam_row_conversion_with_null(self):
+    """Test converting dictionary with null GEOGRAPHY to Beam Row."""
+    schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE"
+        }]
+    }
+
+    row_dict = {"id": 1, "location": None}
+
+    beam_row = beam_row_from_dict(row_dict, schema)
+
+    self.assertEqual(beam_row.id, 1)
+    self.assertIsNone(beam_row.location)
+
+  def test_geography_beam_row_conversion_repeated(self):
+    """Test converting dictionary with repeated GEOGRAPHY to Beam Row."""
+    schema = {
+        "fields": [{
+            "name": "id", "type": "INTEGER", "mode": "REQUIRED"
+        }, {
+            "name": "locations", "type": "GEOGRAPHY", "mode": "REPEATED"
+        }]
+    }
+
+    row_dict = {
+        "id": 1,
+        "locations": ["POINT(30 10)", "POINT(40 20)", "LINESTRING(0 0, 1 1)"]
+    }
+
+    beam_row = beam_row_from_dict(row_dict, schema)
+
+    self.assertEqual(beam_row.id, 1)
+    self.assertEqual(len(beam_row.locations), 3)
+    self.assertEqual(beam_row.locations[0], "POINT(30 10)")
+    self.assertEqual(beam_row.locations[1], "POINT(40 20)")
+    self.assertEqual(beam_row.locations[2], "LINESTRING(0 0, 1 1)")
+
+  def test_geography_json_encoding(self):
+    """Test that GEOGRAPHY values are properly JSON encoded."""
+    coder = RowAsDictJsonCoder()
+
+    row_with_geography = {"id": 1, "location": "POINT(30 10)", "name": "Test"}
+
+    encoded = coder.encode(row_with_geography)
+    decoded = coder.decode(encoded)
+
+    self.assertEqual(decoded["location"], "POINT(30 10)")
+    self.assertIsInstance(decoded["location"], str)
+
+  def test_geography_with_special_characters(self):
+    """Test GEOGRAPHY values with special characters and geometries."""
+    from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+
+    field = bigquery.TableFieldSchema()
+    field.type = 'GEOGRAPHY'
+    field.name = 'complex_geo'
+    field.mode = 'NULLABLE'
+
+    wrapper = BigQueryWrapper()
+
+    # Test complex WKT with various coordinate systems and precision
+    complex_wkt = (
+        "POLYGON((-122.4194 37.7749, -122.4094 37.7849, "
+        "-122.3994 37.7749, -122.4194 37.7749))")
+
+    result = wrapper._convert_cell_value_to_dict(complex_wkt, field)
+    self.assertEqual(result, complex_wkt)
+    self.assertIsInstance(result, str)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


Reply via email to