TheNeuralBit commented on a change in pull request #12611: URL: https://github.com/apache/beam/pull/12611#discussion_r520210174
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.TimestampBound; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an external transform for + * cross-language usage. + */ +@Experimental(Kind.PORTABILITY) +@AutoService(ExternalTransformRegistrar.class) +public class SpannerTransformRegistrar implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:external:java:spanner:write:v1"; + public static final String READ_URN = "beam:external:java:spanner:read:v1"; + + @Override + public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() { + return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_URN, new ReadBuilder()); Review comment: What I had in mind was that there would be a separate URN for each possible write operation like `beam:external:java:spanner:delete`, `beam:external:java:spanner:insert_or_update`, ... Rather than accepting mutations encoded as rows to go over the xlang boundary, each of these transforms would have an input of just `PCollection<Row>` representing the actual data (or `PCollection<List<Row>>` representing the keyset in the Delete case). Then python doesn't even need to have a concept of mutations. There's definitely value in a generic `beam:external:java:spanner:write` transform which accepts arbitrary mutations, but I think we should leave that for future work. ########## File path: sdks/python/apache_beam/io/gcp/spanner.py ########## @@ -0,0 +1,662 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PTransforms for supporting Spanner in Python pipelines. + + These transforms are currently supported by Beam portable + Flink and Spark runners. + + **Setup** + + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + + There are several ways to setup cross-language Spanner transforms. + + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + + See below for details regarding each of these options. + + *Option 1: Use the default expansion service* + + This is the recommended and easiest setup option for using Python Spanner + transforms. This option is only available for Beam 2.26.0 and later. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Install Java runtime in the computer from where the pipeline is constructed + and make sure that 'java' command is available. + + In this option, Python SDK will either download (for released Beam version) or + build (when running from a Beam Git clone) a expansion service jar and use + that to expand transforms. Currently Spanner transforms use the + 'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this + purpose. + + *Option 2: specify a custom expansion service* + + In this option, you startup your own expansion service and provide that as + a parameter when using the transforms provided in this module. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Startup your own expansion service. + * Update your pipeline to provide the expansion service address when + initiating Spanner transforms provided in this module. + + Flink Users can use the built-in Expansion Service of the Flink Runner's + Job Server. If you start Flink's Job Server, the expansion service will be + started on port 8097. For a different address, please set the + expansion_service parameter. + + **More information** + + For more information regarding cross-language transforms see: + - https://beam.apache.org/roadmap/portability/ + + For more information specific to Flink runner see: + - https://beam.apache.org/documentation/runners/flink/ +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import uuid +from enum import Enum +from enum import auto +from typing import List +from typing import NamedTuple +from typing import Optional + +from past.builtins import unicode + +from apache_beam import Map +from apache_beam import PTransform +from apache_beam import coders +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import named_tuple_from_schema +from apache_beam.typehints.schemas import named_tuple_to_schema +from apache_beam.typehints.schemas import schema_from_element_type + +__all__ = [ + 'ReadFromSpanner', + 'SpannerDelete', + 'SpannerInsert', + 'SpannerInsertOrUpdate', + 'SpannerReplace', + 'SpannerUpdate', + 'TimestampBoundMode', + 'TimeUnit', +] + + +def default_io_expansion_service(): + return BeamJarExpansionService( + 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar') + + +_READ_URN = 'beam:external:java:spanner:read:v1' +_WRITE_URN = 'beam:external:java:spanner:write:v1' + + +class TimeUnit(Enum): + NANOSECONDS = auto() + MICROSECONDS = auto() + MILLISECONDS = auto() + SECONDS = auto() + HOURS = auto() + DAYS = auto() + + +class TimestampBoundMode(Enum): + MAX_STALENESS = auto() + EXACT_STALENESS = auto() + READ_TIMESTAMP = auto() + MIN_READ_TIMESTAMP = auto() + STRONG = auto() + + +class ReadFromSpannerSchema(NamedTuple): + instance_id: unicode + database_id: unicode + schema: bytes + sql: Optional[unicode] + table: Optional[unicode] + project_id: Optional[unicode] + host: Optional[unicode] + emulator_host: Optional[unicode] + batching: Optional[bool] + timestamp_bound_mode: Optional[unicode] + read_timestamp: Optional[unicode] + exact_staleness: Optional[int] + time_unit: Optional[unicode] + + +class ReadFromSpanner(ExternalTransform): + """ + A PTransform which reads from the specified Spanner instance's database. + + This transform required type of the row it has to return to provide the + schema. Example:: + + ExampleRow = typing.NamedTuple('ExampleRow', + [('id', int), ('name', unicode)]) + + with Pipeline() as p: + result = ( + p + | ReadFromSpanner( + instance_id='your_instance_id', + database_id='your_database_id', + project_id='your_project_id', + row_type=ExampleRow, + query='SELECT * FROM some_table', + timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS, + exact_staleness=3, + time_unit=TimeUnit.HOURS, + ).with_output_types(ExampleRow)) + + Experimental; no backwards compatibility guarantees. + """ + def __init__( + self, + project_id, + instance_id, + database_id, + row_type=None, + sql=None, + table=None, + host=None, + emulator_host=None, + batching=None, + timestamp_bound_mode=None, + read_timestamp=None, + exact_staleness=None, + time_unit=None, + expansion_service=None, + ): + """ + Initializes a read operation from Spanner. + + :param project_id: Specifies the Cloud Spanner project. + :param instance_id: Specifies the Cloud Spanner instance. + :param database_id: Specifies the Cloud Spanner database. + :param row_type: Row type that fits the given query or table. Passed as + NamedTuple, e.g. NamedTuple('name', [('row_name', unicode)]) + :param sql: An sql query to execute. It's results must fit the + provided row_type. Don't use when table is set. + :param table: A spanner table. When provided all columns from row_type + will be selected to query. Don't use when query is set. + :param batching: By default Batch API is used to read data from Cloud + Spanner. It is useful to disable batching when the underlying query + is not root-partitionable. + :param host: Specifies the Cloud Spanner host. + :param emulator_host: Specifies Spanner emulator host. + :param timestamp_bound_mode: Defines how Cloud Spanner will choose a + timestamp for a read-only transaction or a single read/query. + Passed as TimestampBoundMode enum. Possible values: + STRONG: A timestamp bound that will perform reads and queries at a + timestamp where all previously committed transactions are visible. + READ_TIMESTAMP: Returns a timestamp bound that will perform reads + and queries at the given timestamp. + MIN_READ_TIMESTAMP: Returns a timestamp bound that will perform reads + and queries at a timestamp chosen to be at least given timestamp value. + EXACT_STALENESS: Returns a timestamp bound that will perform reads and + queries at an exact staleness. The timestamp is chosen soon after the + read is started. + MAX_STALENESS: Returns a timestamp bound that will perform reads and + queries at a timestamp chosen to be at most time_unit stale. + :param read_timestamp: Timestamp in string. Use only when + timestamp_bound_mode is set to READ_TIMESTAMP or MIN_READ_TIMESTAMP. + :param exact_staleness: Staleness value as int. Use only when + timestamp_bound_mode is set to EXACT_STALENESS or MAX_STALENESS. + time_unit has to be set along with this param. + :param time_unit: Time unit for staleness_value passed as TimeUnit enum. + Possible values: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, + HOURS, DAYS. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + assert row_type + assert sql or table and not (sql and table) + staleness_value = int(exact_staleness) if exact_staleness else None + + if staleness_value or time_unit: + assert staleness_value and time_unit and \ + timestamp_bound_mode is TimestampBoundMode.MAX_STALENESS or \ + timestamp_bound_mode is TimestampBoundMode.EXACT_STALENESS + + if read_timestamp: + assert timestamp_bound_mode is TimestampBoundMode.MIN_READ_TIMESTAMP\ + or timestamp_bound_mode is TimestampBoundMode.READ_TIMESTAMP + + coders.registry.register_coder(row_type, coders.RowCoder) + + super(ReadFromSpanner, self).__init__( + _READ_URN, + NamedTupleBasedPayloadBuilder( + ReadFromSpannerSchema( + instance_id=instance_id, + database_id=database_id, + sql=sql, + table=table, + schema=named_tuple_to_schema(row_type).SerializeToString(), + project_id=project_id, + host=host, + emulator_host=emulator_host, + batching=batching, + timestamp_bound_mode=_get_enum_name(timestamp_bound_mode), + read_timestamp=read_timestamp, + exact_staleness=exact_staleness, + time_unit=_get_enum_name(time_unit), + ), + ), + expansion_service or default_io_expansion_service(), + ) + + +class WriteToSpannerSchema(NamedTuple): + project_id: unicode + instance_id: unicode + database_id: unicode + max_batch_size_bytes: Optional[int] + max_number_mutations: Optional[int] + max_number_rows: Optional[int] + grouping_factor: Optional[int] + host: Optional[unicode] + emulator_host: Optional[unicode] + commit_deadline: Optional[int] + max_cumulative_backoff: Optional[int] + + +_CLASS_DOC = \ + """ + A PTransform which writes {0} mutations to the specified Spanner table. + + This transform receives rows defined as NamedTuple. Example:: + + {1} = typing.NamedTuple('{1}', + [('id', int), ('name', unicode)]) + + with Pipeline() as p: + _ = ( + p + | 'Impulse' >> beam.Impulse() + | 'Generate' >> beam.FlatMap(lambda x: range(num_rows)) + | 'To row' >> beam.Map(lambda n: {1}(n, str(n)) + .with_output_types({2}) + | 'Write to Spanner' >> Spanner{3}( + instance_id='your_instance', + database_id='existing_database', + project_id='your_project_id', + table='your_table')) + + Experimental; no backwards compatibility guarantees. + """ + +_INIT_DOC = \ + """ + Initializes {} operation to a Spanner table. + + :param project_id: Specifies the Cloud Spanner project. + :param instance_id: Specifies the Cloud Spanner instance. + :param database_id: Specifies the Cloud Spanner database. + :param table: Specifies the Cloud Spanner table. + :param max_batch_size_bytes: Specifies the batch size limit (max number of + bytes mutated per batch). Default value is 1048576 bytes = 1MB. + :param max_number_mutations: Specifies the cell mutation limit (maximum + number of mutated cells per batch). Default value is 5000. + :param max_number_rows: Specifies the row mutation limit (maximum number of + mutated rows per batch). Default value is 500. + :param grouping_factor: Specifies the multiple of max mutation (in terms + of both bytes per batch and cells per batch) that is used to select a + set of mutations to sort by key for batching. This sort uses local + memory on the workers, so using large values can cause out of memory + errors. Default value is 1000. + :param host: Specifies the Cloud Spanner host. + :param emulator_host: Specifies Spanner emulator host. + :param commit_deadline: Specifies the deadline for the Commit API call. + Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry + until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors + are ar reported with logging and counters. Pass seconds as value. + :param max_cumulative_backoff: Specifies the maximum cumulative backoff + time when retrying after DEADLINE_EXCEEDED errors. Default is 900s + (15min). If the mutations still have not been written after this time, + they are treated as a failure, and handled according to the setting of + failure_mode. Pass seconds as value. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + + +def _add_doc(value, *args): + def _doc(obj): + obj.__doc__ = value.format(*args) + return obj + + return _doc + + +@_add_doc(_CLASS_DOC, 'delete', 'ExampleKey', 'List[ExampleKey]', 'Delete') +class SpannerDelete(PTransform): + @_add_doc(_INIT_DOC, 'a delete') + def __init__( + self, + project_id, + instance_id, + database_id, + table, + max_batch_size_bytes=None, + max_number_mutations=None, + max_number_rows=None, + grouping_factor=None, + host=None, + emulator_host=None, + commit_deadline=None, + max_cumulative_backoff=None, + expansion_service=None, + ): + super().__init__() + max_cumulative_backoff = int( + max_cumulative_backoff) if max_cumulative_backoff else None + commit_deadline = int(commit_deadline) if commit_deadline else None + self.table = table + self.params = WriteToSpannerSchema( + project_id=project_id, + instance_id=instance_id, + database_id=database_id, + max_batch_size_bytes=max_batch_size_bytes, + max_number_mutations=max_number_mutations, + max_number_rows=max_number_rows, + grouping_factor=grouping_factor, + host=host, + emulator_host=emulator_host, + commit_deadline=commit_deadline, + max_cumulative_backoff=max_cumulative_backoff, + ) + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return _apply_write_transform( + pbegin, + _RowToMutation(_Operation.DELETE, self.table), + self.params, + self.expansion_service) + + +@_add_doc(_CLASS_DOC, 'insert', 'ExampleRow', 'ExampleRow', 'Insert') +class SpannerInsert(PTransform): + @_add_doc(_INIT_DOC, 'an insert') + def __init__( + self, + project_id, + instance_id, + database_id, + table, + max_batch_size_bytes=None, + max_number_mutations=None, + max_number_rows=None, + grouping_factor=None, + host=None, + emulator_host=None, + commit_deadline=None, + max_cumulative_backoff=None, + expansion_service=None, + ): + super().__init__() + max_cumulative_backoff = int( + max_cumulative_backoff) if max_cumulative_backoff else None + commit_deadline = int(commit_deadline) if commit_deadline else None + self.table = table + self.params = WriteToSpannerSchema( + project_id=project_id, + instance_id=instance_id, + database_id=database_id, + max_batch_size_bytes=max_batch_size_bytes, + max_number_mutations=max_number_mutations, + max_number_rows=max_number_rows, + grouping_factor=grouping_factor, + host=host, + emulator_host=emulator_host, + commit_deadline=commit_deadline, + max_cumulative_backoff=max_cumulative_backoff, + ) + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return _apply_write_transform( + pbegin, + _RowToMutation(_Operation.INSERT, self.table), + self.params, + self.expansion_service) + + +@_add_doc(_CLASS_DOC, 'replace', 'ExampleRow', 'ExampleRow', 'Replace') +class SpannerReplace(PTransform): + @_add_doc(_INIT_DOC, 'a replace') + def __init__( + self, + project_id, + instance_id, + database_id, + table, + max_batch_size_bytes=None, + max_number_mutations=None, + max_number_rows=None, + grouping_factor=None, + host=None, + emulator_host=None, + commit_deadline=None, + max_cumulative_backoff=None, + expansion_service=None, + ): + super().__init__() + max_cumulative_backoff = int( + max_cumulative_backoff) if max_cumulative_backoff else None + commit_deadline = int(commit_deadline) if commit_deadline else None + self.table = table + self.params = WriteToSpannerSchema( + project_id=project_id, + instance_id=instance_id, + database_id=database_id, + max_batch_size_bytes=max_batch_size_bytes, + max_number_mutations=max_number_mutations, + max_number_rows=max_number_rows, + grouping_factor=grouping_factor, + host=host, + emulator_host=emulator_host, + commit_deadline=commit_deadline, + max_cumulative_backoff=max_cumulative_backoff, + ) + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return _apply_write_transform( + pbegin, + _RowToMutation(_Operation.REPLACE, self.table), + self.params, + self.expansion_service) + + +@_add_doc( + _CLASS_DOC, + 'insert-or-update', + 'ExampleRow', + 'ExampleRow', + 'InsertOrUpdate') +class SpannerInsertOrUpdate(PTransform): + @_add_doc(_INIT_DOC, 'an insert-or-update') + def __init__( + self, + project_id, + instance_id, + database_id, + table, + max_batch_size_bytes=None, + max_number_mutations=None, + max_number_rows=None, + grouping_factor=None, + host=None, + emulator_host=None, + commit_deadline=None, + max_cumulative_backoff=None, + expansion_service=None, + ): + super().__init__() + max_cumulative_backoff = int( + max_cumulative_backoff) if max_cumulative_backoff else None + commit_deadline = int(commit_deadline) if commit_deadline else None + self.table = table + self.params = WriteToSpannerSchema( + project_id=project_id, + instance_id=instance_id, + database_id=database_id, + max_batch_size_bytes=max_batch_size_bytes, + max_number_mutations=max_number_mutations, + max_number_rows=max_number_rows, + grouping_factor=grouping_factor, + host=host, + emulator_host=emulator_host, + commit_deadline=commit_deadline, + max_cumulative_backoff=max_cumulative_backoff, + ) + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return _apply_write_transform( + pbegin, + _RowToMutation(_Operation.INSERT_OR_UPDATE, self.table), + self.params, + self.expansion_service) + + +@_add_doc(_CLASS_DOC, 'update', 'ExampleRow', 'ExampleRow', 'Update') +class SpannerUpdate(PTransform): + @_add_doc(_INIT_DOC, 'an update') + def __init__( + self, + project_id, + instance_id, + database_id, + table, + max_batch_size_bytes=None, + max_number_mutations=None, + max_number_rows=None, + grouping_factor=None, + host=None, + emulator_host=None, + commit_deadline=None, + max_cumulative_backoff=None, + expansion_service=None, + ): + super().__init__() + max_cumulative_backoff = int( + max_cumulative_backoff) if max_cumulative_backoff else None + commit_deadline = int(commit_deadline) if commit_deadline else None + self.table = table + self.params = WriteToSpannerSchema( + project_id=project_id, + instance_id=instance_id, + database_id=database_id, + max_batch_size_bytes=max_batch_size_bytes, + max_number_mutations=max_number_mutations, + max_number_rows=max_number_rows, + grouping_factor=grouping_factor, + host=host, + emulator_host=emulator_host, + commit_deadline=commit_deadline, + max_cumulative_backoff=max_cumulative_backoff, + ) + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return _apply_write_transform( + pbegin, + _RowToMutation(_Operation.UPDATE, self.table), + self.params, + self.expansion_service) + + +def _apply_write_transform(pbegin, to_mutation, params, expansion_service): + return ( + pbegin + | to_mutation + | ExternalTransform( + _WRITE_URN, NamedTupleBasedPayloadBuilder(params), expansion_service)) + + +class _RowToMutation(PTransform): Review comment: As pointed out in the last comment I think it would be preferable if Python didn't even need to have a concept of Mutations (for now). Instead it just sends the Rows (or Keysets) over to Java, which can wrap them in mutations for use in SpannerIO ########## File path: sdks/python/apache_beam/io/gcp/spanner.py ########## @@ -0,0 +1,662 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PTransforms for supporting Spanner in Python pipelines. + + These transforms are currently supported by Beam portable + Flink and Spark runners. + + **Setup** + + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + + There are several ways to setup cross-language Spanner transforms. + + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + + See below for details regarding each of these options. + + *Option 1: Use the default expansion service* + + This is the recommended and easiest setup option for using Python Spanner + transforms. This option is only available for Beam 2.26.0 and later. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Install Java runtime in the computer from where the pipeline is constructed + and make sure that 'java' command is available. + + In this option, Python SDK will either download (for released Beam version) or + build (when running from a Beam Git clone) a expansion service jar and use + that to expand transforms. Currently Spanner transforms use the + 'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this + purpose. + + *Option 2: specify a custom expansion service* + + In this option, you startup your own expansion service and provide that as + a parameter when using the transforms provided in this module. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Startup your own expansion service. + * Update your pipeline to provide the expansion service address when + initiating Spanner transforms provided in this module. + + Flink Users can use the built-in Expansion Service of the Flink Runner's + Job Server. If you start Flink's Job Server, the expansion service will be + started on port 8097. For a different address, please set the + expansion_service parameter. + + **More information** + + For more information regarding cross-language transforms see: + - https://beam.apache.org/roadmap/portability/ + + For more information specific to Flink runner see: + - https://beam.apache.org/documentation/runners/flink/ +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import uuid +from enum import Enum +from enum import auto +from typing import List +from typing import NamedTuple +from typing import Optional + +from past.builtins import unicode + +from apache_beam import Map +from apache_beam import PTransform +from apache_beam import coders +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import named_tuple_from_schema +from apache_beam.typehints.schemas import named_tuple_to_schema +from apache_beam.typehints.schemas import schema_from_element_type + +__all__ = [ + 'ReadFromSpanner', + 'SpannerDelete', + 'SpannerInsert', + 'SpannerInsertOrUpdate', + 'SpannerReplace', + 'SpannerUpdate', + 'TimestampBoundMode', + 'TimeUnit', +] + + +def default_io_expansion_service(): + return BeamJarExpansionService( + 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar') + + +_READ_URN = 'beam:external:java:spanner:read:v1' +_WRITE_URN = 'beam:external:java:spanner:write:v1' + + +class TimeUnit(Enum): + NANOSECONDS = auto() + MICROSECONDS = auto() + MILLISECONDS = auto() + SECONDS = auto() + HOURS = auto() + DAYS = auto() + + +class TimestampBoundMode(Enum): + MAX_STALENESS = auto() + EXACT_STALENESS = auto() + READ_TIMESTAMP = auto() + MIN_READ_TIMESTAMP = auto() + STRONG = auto() + + +class ReadFromSpannerSchema(NamedTuple): + instance_id: unicode + database_id: unicode + schema: bytes + sql: Optional[unicode] + table: Optional[unicode] + project_id: Optional[unicode] + host: Optional[unicode] + emulator_host: Optional[unicode] + batching: Optional[bool] + timestamp_bound_mode: Optional[unicode] + read_timestamp: Optional[unicode] + exact_staleness: Optional[int] + time_unit: Optional[unicode] + + +class ReadFromSpanner(ExternalTransform): + """ + A PTransform which reads from the specified Spanner instance's database. + + This transform required type of the row it has to return to provide the + schema. Example:: + + ExampleRow = typing.NamedTuple('ExampleRow', + [('id', int), ('name', unicode)]) + + with Pipeline() as p: + result = ( + p + | ReadFromSpanner( + instance_id='your_instance_id', + database_id='your_database_id', + project_id='your_project_id', + row_type=ExampleRow, + query='SELECT * FROM some_table', + timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS, + exact_staleness=3, + time_unit=TimeUnit.HOURS, + ).with_output_types(ExampleRow)) + + Experimental; no backwards compatibility guarantees. + """ + def __init__( + self, + project_id, + instance_id, + database_id, + row_type=None, + sql=None, + table=None, + host=None, + emulator_host=None, + batching=None, + timestamp_bound_mode=None, + read_timestamp=None, + exact_staleness=None, + time_unit=None, + expansion_service=None, + ): + """ + Initializes a read operation from Spanner. + + :param project_id: Specifies the Cloud Spanner project. + :param instance_id: Specifies the Cloud Spanner instance. + :param database_id: Specifies the Cloud Spanner database. + :param row_type: Row type that fits the given query or table. Passed as + NamedTuple, e.g. NamedTuple('name', [('row_name', unicode)]) + :param sql: An sql query to execute. It's results must fit the + provided row_type. Don't use when table is set. + :param table: A spanner table. When provided all columns from row_type + will be selected to query. Don't use when query is set. + :param batching: By default Batch API is used to read data from Cloud + Spanner. It is useful to disable batching when the underlying query + is not root-partitionable. + :param host: Specifies the Cloud Spanner host. + :param emulator_host: Specifies Spanner emulator host. + :param timestamp_bound_mode: Defines how Cloud Spanner will choose a + timestamp for a read-only transaction or a single read/query. + Passed as TimestampBoundMode enum. Possible values: + STRONG: A timestamp bound that will perform reads and queries at a + timestamp where all previously committed transactions are visible. + READ_TIMESTAMP: Returns a timestamp bound that will perform reads + and queries at the given timestamp. + MIN_READ_TIMESTAMP: Returns a timestamp bound that will perform reads + and queries at a timestamp chosen to be at least given timestamp value. + EXACT_STALENESS: Returns a timestamp bound that will perform reads and + queries at an exact staleness. The timestamp is chosen soon after the + read is started. + MAX_STALENESS: Returns a timestamp bound that will perform reads and + queries at a timestamp chosen to be at most time_unit stale. + :param read_timestamp: Timestamp in string. Use only when + timestamp_bound_mode is set to READ_TIMESTAMP or MIN_READ_TIMESTAMP. + :param exact_staleness: Staleness value as int. Use only when + timestamp_bound_mode is set to EXACT_STALENESS or MAX_STALENESS. + time_unit has to be set along with this param. + :param time_unit: Time unit for staleness_value passed as TimeUnit enum. + Possible values: NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, + HOURS, DAYS. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + assert row_type + assert sql or table and not (sql and table) + staleness_value = int(exact_staleness) if exact_staleness else None + + if staleness_value or time_unit: + assert staleness_value and time_unit and \ + timestamp_bound_mode is TimestampBoundMode.MAX_STALENESS or \ + timestamp_bound_mode is TimestampBoundMode.EXACT_STALENESS + + if read_timestamp: + assert timestamp_bound_mode is TimestampBoundMode.MIN_READ_TIMESTAMP\ + or timestamp_bound_mode is TimestampBoundMode.READ_TIMESTAMP + + coders.registry.register_coder(row_type, coders.RowCoder) + + super(ReadFromSpanner, self).__init__( + _READ_URN, + NamedTupleBasedPayloadBuilder( + ReadFromSpannerSchema( + instance_id=instance_id, + database_id=database_id, + sql=sql, + table=table, + schema=named_tuple_to_schema(row_type).SerializeToString(), + project_id=project_id, + host=host, + emulator_host=emulator_host, + batching=batching, + timestamp_bound_mode=_get_enum_name(timestamp_bound_mode), + read_timestamp=read_timestamp, + exact_staleness=exact_staleness, + time_unit=_get_enum_name(time_unit), + ), + ), + expansion_service or default_io_expansion_service(), + ) + + +class WriteToSpannerSchema(NamedTuple): + project_id: unicode + instance_id: unicode + database_id: unicode + max_batch_size_bytes: Optional[int] + max_number_mutations: Optional[int] + max_number_rows: Optional[int] + grouping_factor: Optional[int] + host: Optional[unicode] + emulator_host: Optional[unicode] + commit_deadline: Optional[int] + max_cumulative_backoff: Optional[int] + + +_CLASS_DOC = \ + """ + A PTransform which writes {0} mutations to the specified Spanner table. + + This transform receives rows defined as NamedTuple. Example:: + + {1} = typing.NamedTuple('{1}', + [('id', int), ('name', unicode)]) + + with Pipeline() as p: + _ = ( + p + | 'Impulse' >> beam.Impulse() + | 'Generate' >> beam.FlatMap(lambda x: range(num_rows)) + | 'To row' >> beam.Map(lambda n: {1}(n, str(n)) + .with_output_types({2}) + | 'Write to Spanner' >> Spanner{3}( + instance_id='your_instance', + database_id='existing_database', + project_id='your_project_id', + table='your_table')) + + Experimental; no backwards compatibility guarantees. + """ + +_INIT_DOC = \ + """ + Initializes {} operation to a Spanner table. + + :param project_id: Specifies the Cloud Spanner project. + :param instance_id: Specifies the Cloud Spanner instance. + :param database_id: Specifies the Cloud Spanner database. + :param table: Specifies the Cloud Spanner table. + :param max_batch_size_bytes: Specifies the batch size limit (max number of + bytes mutated per batch). Default value is 1048576 bytes = 1MB. + :param max_number_mutations: Specifies the cell mutation limit (maximum + number of mutated cells per batch). Default value is 5000. + :param max_number_rows: Specifies the row mutation limit (maximum number of + mutated rows per batch). Default value is 500. + :param grouping_factor: Specifies the multiple of max mutation (in terms + of both bytes per batch and cells per batch) that is used to select a + set of mutations to sort by key for batching. This sort uses local + memory on the workers, so using large values can cause out of memory + errors. Default value is 1000. + :param host: Specifies the Cloud Spanner host. + :param emulator_host: Specifies Spanner emulator host. + :param commit_deadline: Specifies the deadline for the Commit API call. + Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a backoff/retry + until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors + are ar reported with logging and counters. Pass seconds as value. + :param max_cumulative_backoff: Specifies the maximum cumulative backoff + time when retrying after DEADLINE_EXCEEDED errors. Default is 900s + (15min). If the mutations still have not been written after this time, + they are treated as a failure, and handled according to the setting of + failure_mode. Pass seconds as value. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + + +def _add_doc(value, *args): + def _doc(obj): + obj.__doc__ = value.format(*args) + return obj + + return _doc + + +@_add_doc(_CLASS_DOC, 'delete', 'ExampleKey', 'List[ExampleKey]', 'Delete') Review comment: nice job keeping this concise :+1: My only nit is that it would be a bit more readable if you used keyword args and had named parameters in the docstring template. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
