claudevdm commented on code in PR #34398:
URL: https://github.com/apache/beam/pull/34398#discussion_r2096135419


##########
sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py:
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from collections.abc import Callable
+from collections.abc import Mapping
+from enum import Enum
+from typing import Any
+from typing import Optional
+from typing import Union
+
+from sqlalchemy import create_engine
+from sqlalchemy import text
+
+import apache_beam as beam
+from apache_beam.transforms.enrichment import EnrichmentSourceHandler
+
+QueryFn = Callable[[beam.Row], str]
+ConditionValueFn = Callable[[beam.Row], list[Any]]
+
+
+def _validate_cloudsql_metadata(
+    table_id,
+    where_clause_template,
+    where_clause_fields,
+    where_clause_value_fn,
+    query_fn):
+  if query_fn:
+    if any([table_id,
+            where_clause_template,
+            where_clause_fields,
+            where_clause_value_fn]):
+      raise ValueError(
+          "Please provide either `query_fn` or the parameters `table_id`, "
+          "`where_clause_template`, and "
+          "`where_clause_fields/where_clause_value_fn` together.")
+  else:
+    if not (table_id and where_clause_template):
+      raise ValueError(
+          "Please provide either `query_fn` or the parameters "
+          "`table_id` and `where_clause_template` together.")
+    if (bool(where_clause_fields) == bool(where_clause_value_fn)):
+      raise ValueError(
+          "Please provide exactly one of `where_clause_fields` or "
+          "`where_clause_value_fn`.")
+
+
+class DatabaseTypeAdapter(Enum):
+  POSTGRESQL = "psycopg2"
+  MYSQL = "pymysql"
+  SQLSERVER = "pytds"
+
+  def to_sqlalchemy_dialect(self):
+    """
+      Map the adapter type to its corresponding SQLAlchemy dialect.
+      Returns:
+          str: SQLAlchemy dialect string.
+      """
+    if self == DatabaseTypeAdapter.POSTGRESQL:
+      return f"postgresql+{self.value}"
+    elif self == DatabaseTypeAdapter.MYSQL:
+      return f"mysql+{self.value}"
+    elif self == DatabaseTypeAdapter.SQLSERVER:
+      return f"mssql+{self.value}"
+    else:
+      raise ValueError(f"Unsupported adapter type: {self.name}")
+
+
+class CloudSQLEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]):
+  """
+  Enrichment handler for Cloud SQL databases.
+
+  This handler is designed to work with the
+  :class:`apache_beam.transforms.enrichment.Enrichment` transform.
+
+  To use this handler, you need to provide either of the following 
combinations:
+    * `table_id`, `where_clause_template`, `where_clause_fields`
+    * `table_id`, `where_clause_template`, `where_clause_value_fn`
+    * `query_fn`
+
+  By default, the handler retrieves all columns from the specified table.
+  To limit the columns, use the `column_names` parameter to specify
+  the desired column names.
+
+  This handler queries the Cloud SQL database per element by default.
+  To enable batching, set the `min_batch_size` and `max_batch_size` parameters.
+  These values control the batching behavior in the
+  :class:`apache_beam.transforms.utils.BatchElements` transform.
+
+  NOTE: Batching is not supported when using the `query_fn` parameter.
+  """
+  def __init__(
+      self,
+      database_type_adapter: DatabaseTypeAdapter,
+      database_address: str,
+      database_user: str,
+      database_password: str,
+      database_id: str,
+      *,
+      table_id: str = "",
+      where_clause_template: str = "",

Review Comment:
   Can we create dataclasses for the various permutations of the restriction 
templates to make it clearer which parameters should be grouped together? For 
example we can remove table_id, where_clause_template, where_clause_fields, 
where_clause_value_fn and query_fn and instead add a prameter
   
   `query_config: Union[CustomQueryConfig, TableFieldsQueryConfig, 
TableFunctionQueryConfig]`, where each of these is a dataclass that clearly 
defines the expected parameters like
   
   ```
   @dataclass
   class CustomQueryConfig:
       """Configuration for using a custom query function."""
       query_fn: Callable[[beam.Row], str]
   
   @dataclass
   class TableFieldsQueryConfig:
       """Configuration for using table name, where clause, and field names."""
       table_id: str
       where_clause_template: str
       where_clause_fields: List[str]
   
   @dataclass
   class TableFunctionQueryConfig:
       """Configuration for using table name, where clause, and a value 
function."""
       table_id: str
       where_clause_template: str
       where_clause_value_fn: Callable[[beam.Row], List[Any]]
   ```



##########
sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_test.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from parameterized import parameterized
+
+# pylint: disable=ungrouped-imports
+try:
+  from apache_beam.transforms.enrichment_handlers.cloudsql import (
+      CloudSQLEnrichmentHandler, DatabaseTypeAdapter)
+  from apache_beam.transforms.enrichment_handlers.cloudsql_it_test import (
+      query_fn,
+      where_clause_value_fn,
+  )
+except ImportError:
+  raise unittest.SkipTest('Google Cloud SQL dependencies are not installed.')
+
+
+class TestCloudSQLEnrichment(unittest.TestCase):

Review Comment:
   Can we add test cases similar to 
https://github.com/apache/beam/blob/6bedaefaa79aa15c4bc90c454ef538c228757b2b/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to