TheNeuralBit commented on a change in pull request #12023: URL: https://github.com/apache/beam/pull/12023#discussion_r447171092
########## File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py ########## @@ -0,0 +1,138 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing +import unittest + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam import coders +from apache_beam.io.external.jdbc import WriteToJdbc +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import sqlalchemy +except ImportError: + sqlalchemy = None +# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from testcontainers.postgres import PostgresContainer +except ImportError: + PostgresContainer = None +# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports + +JdbcTestRow = typing.NamedTuple( + "JdbcTestRow", + [ + ("f_id", int), + ("f_real", float), + ("f_string", unicode), + ], +) + +coders.registry.register_coder(JdbcTestRow, coders.RowCoder) + + +@unittest.skipIf(sqlalchemy is None, "sql alchemy package is not installed.") +@unittest.skipIf( + PostgresContainer is None, "testcontainers package is not installed") Review comment: I think we should add these packages to the test extra: https://github.com/apache/beam/blob/92170e85aa6284ad3908656ecc2d6f31505d0310/sdks/python/setup.py#L181-L198 cc: @aaltay in case there are any concerns with adding these dependencies. For licenses they're both under Apache 2.0 (and it's also just a test dependency). ########## File path: sdks/python/apache_beam/io/external/jdbc.py ########## @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + PTransforms for supporting Jdbc in Python pipelines. These transforms do not + run a Jdbc client in Python. Instead, they expand to ExternalTransforms + which the Expansion Service resolves to the Java SDK's JdbcIO. + + Note: To use these transforms, you need to start a Java Expansion Service. + Please refer to the portability documentation on how to do that. Flink Users + can use the built-in Expansion Service of the Flink Runner's Job Server. The + expansion service address has to be provided when instantiating the + transforms. + + If you start Flink's Job Server, the expansion service will be started on + port 8097. This is also the configured default for this transform. For a Review comment: But where does the code for (1) live on the python side? As far as I can tell the only two options for this transform are either overriding the expansion service with the `expansion_service=` parameter, or the default - running shadowJar with `BeamJarExpansionService` (what you have in (2)). I can't find any logic that looks for an expansion service on port 8097. Am I missing something? ########## File path: sdks/python/apache_beam/io/external/jdbc.py ########## @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + PTransforms for supporting Jdbc in Python pipelines. These transforms do not + run a Jdbc client in Python. Instead, they expand to ExternalTransforms + which the Expansion Service resolves to the Java SDK's JdbcIO. + + Note: To use these transforms, you need to start a Java Expansion Service. + Please refer to the portability documentation on how to do that. Flink Users + can use the built-in Expansion Service of the Flink Runner's Job Server. The + expansion service address has to be provided when instantiating the + transforms. + + If you start Flink's Job Server, the expansion service will be started on + port 8097. This is also the configured default for this transform. For a + different address, please set the expansion_service parameter. + + For more information see: + - https://beam.apache.org/documentation/runners/flink/ + - https://beam.apache.org/roadmap/portability/ +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +__all__ = ['WriteToJdbc'] + + +def default_io_expansion_service(): + return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') + + +WriteToJdbcSchema = typing.NamedTuple( + 'WriteToJdbcSchema', + [ + ('driver_class_name', unicode), + ('jdbc_url', unicode), + ('username', unicode), + ('password', unicode), + ('statement', unicode), + ('connection_properties', unicode), + ('connection_init_sqls', typing.Optional[typing.List[unicode]]), + ], +) + + +class WriteToJdbc(ExternalTransform): + """ + An external PTransform which writes Rows to the specified database. + + This transform receives Rows defined as NamedTuple type and registered in + the coders registry to use RowCoder, e.g. + + import typing + from apache_beam import coders + + ExampleRow = typing.NamedTuple( + "ExampleRow", + [ + ("id", int), + ("name", unicode), + ("budget", float), + ], + ) + coders.registry.register_coder(ExampleRow, coders.RowCoder) + + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. + """ + + URN = 'beam:external:java:jdbc:write:v1' + + def __init__( + self, + driver_class_name, + jdbc_url, + username, + password, + statement, + connection_properties='', + connection_init_sqls=None, + expansion_service=None, + ): Review comment: Could you make these parameters into keyword args? It's easy for a user to get lost in a long list of function parameters, but the keywords args will make it explicit. The only cost is you'll need explicit checks the the required parameters aren't `None`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org