[ 
https://issues.apache.org/jira/browse/BEAM-7246?focusedWorklogId=379318&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379318
 ]

ASF GitHub Bot logged work on BEAM-7246:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jan/20 10:57
            Start Date: 30/Jan/20 10:57
    Worklog Time Spent: 10m 
      Work Description: mszb commented on pull request #10712: [BEAM-7246] 
Added Google Spanner Write Transform
URL: https://github.com/apache/beam/pull/10712#discussion_r372831190
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
 ##########
 @@ -581,3 +644,369 @@ def display_data(self):
                                            label='transaction')
 
     return res
+
+
+@experimental(extra_message="No backwards-compatibility guarantees.")
+class WriteToSpanner(PTransform):
+
+  def __init__(self, project_id, instance_id, database_id, pool=None,
+               credentials=None, max_batch_size_bytes=1048576):
+    """
+    A PTransform to write onto Google Cloud Spanner.
+
+    Args:
+      project_id: Cloud spanner project id. Be sure to use the Project ID,
+        not the Project Number.
+      instance_id: Cloud spanner instance id.
+      database_id: Cloud spanner database id.
+      max_batch_size_bytes: (optional) Split the mutation into batches to
+        reduce the number of transaction sent to Spanner. By default it is
+        set to 1 MB (1048576 Bytes).
+    """
+    self._configuration = _BeamSpannerConfiguration(
+        project=project_id, instance=instance_id, database=database_id,
+        credentials=credentials, pool=pool, snapshot_read_timestamp=None,
+        snapshot_exact_staleness=None
+    )
+    self._max_batch_size_bytes = max_batch_size_bytes
+    self._database_id = database_id
+    self._project_id = project_id
+    self._instance_id = instance_id
+    self._pool = pool
+
+  def display_data(self):
+    res = {
+        'project_id': DisplayDataItem(self._project_id, label='Project Id'),
+        'instance_id': DisplayDataItem(self._instance_id, label='Instance Id'),
+        'pool': DisplayDataItem(str(self._pool), label='Pool'),
+        'database': DisplayDataItem(self._database_id, label='Database'),
+        'batch_size': DisplayDataItem(self._max_batch_size_bytes,
+                                      label="Batch Size"),
+    }
+    return res
+
+  def expand(self, pcoll):
+    return (pcoll
+            | "make batches" >>
+            _WriteGroup(max_batch_size_bytes=self._max_batch_size_bytes)
+            | 'Writing to spanner' >> ParDo(
+                _WriteToSpannerDoFn(self._configuration)))
+
+
+class _Mutator(namedtuple('_Mutator', ["mutation", "operation", "kwargs"])):
+  __slots__ = ()
+
+  @property
+  def byte_size(self):
+    return self.mutation.ByteSize()
+
+
+class MutationGroup(deque):
+  """
+  A Bundle of Spanner Mutations (_Mutator).
+  """
+
+  @property
+  def byte_size(self):
+    s = 0
+    for m in self.__iter__():
+      s += m.byte_size
+    return s
+
+  def primary(self):
+    return next(self.__iter__())
+
+
+class WriteMutation(object):
+
+  _OPERATION_DELETE = "delete"
+  _OPERATION_INSERT = "insert"
+  _OPERATION_INSERT_OR_UPDATE = "insert_or_update"
+  _OPERATION_REPLACE = "replace"
+  _OPERATION_UPDATE = "update"
+
+  def __init__(self,
+               insert=None,
+               update=None,
+               insert_or_update=None,
+               replace=None,
+               delete=None,
+               columns=None,
+               values=None,
+               keyset=None):
+    """
+    A convenient class to create Spanner Mutations for Write. User can provide
+    the operation via constructor or via static methods.
+
+    Note: If a user passing the operation via construction, make sure that it
+    will only accept one operation at a time. For example, if a user passing
+    a table name in the `insert` parameter, and he also passes the `update`
+    parameter value, this will cause an error.
+
+    Args:
+      insert: (Optional) Name of the table in which rows will be inserted.
+      update: (Optional) Name of the table in which existing rows will be
+        updated.
+      insert_or_update: (Optional) Table name in which rows will be written.
+        Like insert, except that if the row already exists, then its column
+        values are overwritten with the ones provided. Any column values not
+        explicitly written are preserved.
+      replace: (Optional) Table name in which rows will be replaced. Like
+        insert, except that if the row already exists, it is deleted, and the
+        column values provided are inserted instead. Unlike `insert_or_update`,
+        this means any values not explicitly written become `NULL`.
+      delete: (Optional) Table name from which rows will be deleted. Succeeds
+        whether or not the named rows were present.
+      columns: The names of the columns in table to be written. The list of
+        columns must contain enough columns to allow Cloud Spanner to derive
+        values for all primary key columns in the row(s) to be modified.
+      values: The values to be written. `values` can contain more than one
+        list of values. If it does, then multiple rows are written, one for
+        each entry in `values`. Each list in `values` must have exactly as
+        many entries as there are entries in columns above. Sending multiple
+        lists is equivalent to sending multiple Mutations, each containing one
+        `values` entry and repeating table and columns.
+      keyset: (Optional) The primary keys of the rows within table to delete.
+        Delete is idempotent. The transaction will succeed even if some or
+        all rows do not exist.
+    """
+    self._columns = columns
+    self._values = values
+    self._keyset = keyset
+
+    self._insert = insert
+    self._update = update
+    self._insert_or_update = insert_or_update
+    self._replace = replace
+    self._delete = delete
+
+    if sum([
+        1 for x in [self._insert, self._update, self._insert_or_update,
+                    self._replace, self._delete]
+        if x is not None
+    ]) != 1:
+      raise ValueError("No or more than one write mutation operation "
+                       "provided: <%s: %s>" % (self.__class__.__name__,
+                                               str(self.__dict__)))
+
+  def __call__(self, *args, **kwargs):
 
 Review comment:
   Since in python we don't use builder pattern, I use this magic method to 
construct the mutation object! This class also have the static method which I 
thought is more convenient for the user to use. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 379318)
    Time Spent: 15h  (was: 14h 50m)

> Create a Spanner IO for Python
> ------------------------------
>
>                 Key: BEAM-7246
>                 URL: https://issues.apache.org/jira/browse/BEAM-7246
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>            Reporter: Reuven Lax
>            Assignee: Shehzaad Nakhoda
>            Priority: Major
>          Time Spent: 15h
>  Remaining Estimate: 0h
>
> Add I/O support for Google Cloud Spanner for the Python SDK (Batch Only).
> Testing in this work item will be in the form of DirectRunner tests and 
> manual testing.
> Integration and performance tests are a separate work item (not included 
> here).
> See https://beam.apache.org/documentation/io/built-in/. The goal is to add 
> Google Clound Spanner to the Database column for the Python/Batch row.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to