This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6593a0f15c1 Adding failure_mode parameter to Spanner Python transforms 
(#29529)
6593a0f15c1 is described below

commit 6593a0f15c1a3cc9813059100425d5716eba0193
Author: Pablo Estrada <[email protected]>
AuthorDate: Sat Dec 2 08:13:54 2023 -0800

    Adding failure_mode parameter to Spanner Python transforms (#29529)
    
    * Adding failure_mode parameter to Spanner Python transforms
    
    * fix formatting
    
    * Improve pydoc
    
    * fixup docs
---
 .../io/gcp/spanner/SpannerTransformRegistrar.java   | 10 ++++++++++
 sdks/python/apache_beam/io/gcp/spanner.py           | 21 +++++++++++++++++++++
 .../io/gcp/tests/xlang_spannerio_it_test.py         |  1 +
 3 files changed, 32 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
index 38cd97da860..809d7a27551 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
@@ -288,6 +288,7 @@ public class SpannerTransformRegistrar implements 
ExternalTransformRegistrar {
       private @Nullable Integer groupingFactor;
       private @Nullable Duration commitDeadline;
       private @Nullable Duration maxCumulativeBackoff;
+      private @Nullable String failureMode;
 
       public void setTable(String table) {
         this.table = table;
@@ -322,6 +323,10 @@ public class SpannerTransformRegistrar implements 
ExternalTransformRegistrar {
           this.maxCumulativeBackoff = 
Duration.standardSeconds(maxCumulativeBackoff);
         }
       }
+
+      public void setFailureMode(@Nullable String failureMode) {
+        this.failureMode = failureMode;
+      }
     }
 
     @Override
@@ -361,6 +366,11 @@ public class SpannerTransformRegistrar implements 
ExternalTransformRegistrar {
         writeTransform =
             
writeTransform.withMaxCumulativeBackoff(configuration.maxCumulativeBackoff);
       }
+      if (configuration.failureMode != null) {
+        writeTransform =
+            writeTransform.withFailureMode(
+                SpannerIO.FailureMode.valueOf(configuration.failureMode));
+      }
       return SpannerIO.WriteRows.of(writeTransform, operation, 
configuration.table);
     }
   }
diff --git a/sdks/python/apache_beam/io/gcp/spanner.py 
b/sdks/python/apache_beam/io/gcp/spanner.py
index c16daa4448b..51c7fc65c17 100644
--- a/sdks/python/apache_beam/io/gcp/spanner.py
+++ b/sdks/python/apache_beam/io/gcp/spanner.py
@@ -124,6 +124,11 @@ class TimestampBoundMode(Enum):
   STRONG = auto()
 
 
+class FailureMode(Enum):
+  FAIL_FAST = auto()
+  REPORT_FAILURES = auto()
+
+
 class ReadFromSpannerSchema(NamedTuple):
   instance_id: str
   database_id: str
@@ -282,6 +287,7 @@ class WriteToSpannerSchema(NamedTuple):
   emulator_host: Optional[str]
   commit_deadline: Optional[int]
   max_cumulative_backoff: Optional[int]
+  failure_mode: Optional[str]
 
 
 _CLASS_DOC = \
@@ -346,6 +352,11 @@ _INIT_DOC = \
       (15min). If the mutations still have not been written after this time,
       they are treated as a failure, and handled according to the setting of
       failure_mode. Pass seconds as value.
+  :param failure_mode: Specifies the behavior for mutations that fail to be
+      written to Spanner. Default is FAIL_FAST. When FAIL_FAST is set,
+      an exception will be thrown for any failed mutation. When REPORT_FAILURES
+      is set, processing will continue instead of throwing an exception. Note
+      that REPORT_FAILURES can cause data loss if used incorrectly.
   :param expansion_service: The address (host:port) of the ExpansionService.
   """
 
@@ -392,6 +403,7 @@ class SpannerDelete(ExternalTransform):
       emulator_host=None,
       commit_deadline=None,
       max_cumulative_backoff=None,
+      failure_mode=None,
       expansion_service=None,
   ):
     max_cumulative_backoff = int(
@@ -413,6 +425,7 @@ class SpannerDelete(ExternalTransform):
                 emulator_host=emulator_host,
                 commit_deadline=commit_deadline,
                 max_cumulative_backoff=max_cumulative_backoff,
+                failure_mode=_get_enum_name(failure_mode),
             ),
         ),
         expansion_service=expansion_service or default_io_expansion_service(),
@@ -445,6 +458,7 @@ class SpannerInsert(ExternalTransform):
       commit_deadline=None,
       max_cumulative_backoff=None,
       expansion_service=None,
+      failure_mode=None,
   ):
     max_cumulative_backoff = int(
         max_cumulative_backoff) if max_cumulative_backoff else None
@@ -465,6 +479,7 @@ class SpannerInsert(ExternalTransform):
                 emulator_host=emulator_host,
                 commit_deadline=commit_deadline,
                 max_cumulative_backoff=max_cumulative_backoff,
+                failure_mode=_get_enum_name(failure_mode),
             ),
         ),
         expansion_service=expansion_service or default_io_expansion_service(),
@@ -497,6 +512,7 @@ class SpannerReplace(ExternalTransform):
       commit_deadline=None,
       max_cumulative_backoff=None,
       expansion_service=None,
+      failure_mode=None,
   ):
     max_cumulative_backoff = int(
         max_cumulative_backoff) if max_cumulative_backoff else None
@@ -517,6 +533,7 @@ class SpannerReplace(ExternalTransform):
                 emulator_host=emulator_host,
                 commit_deadline=commit_deadline,
                 max_cumulative_backoff=max_cumulative_backoff,
+                failure_mode=_get_enum_name(failure_mode),
             ),
         ),
         expansion_service=expansion_service or default_io_expansion_service(),
@@ -548,6 +565,7 @@ class SpannerInsertOrUpdate(ExternalTransform):
       emulator_host=None,
       commit_deadline=None,
       max_cumulative_backoff=None,
+      failure_mode=None,
       expansion_service=None,
   ):
     max_cumulative_backoff = int(
@@ -569,6 +587,7 @@ class SpannerInsertOrUpdate(ExternalTransform):
                 emulator_host=emulator_host,
                 commit_deadline=commit_deadline,
                 max_cumulative_backoff=max_cumulative_backoff,
+                failure_mode=_get_enum_name(failure_mode),
             ),
         ),
         expansion_service=expansion_service or default_io_expansion_service(),
@@ -600,6 +619,7 @@ class SpannerUpdate(ExternalTransform):
       emulator_host=None,
       commit_deadline=None,
       max_cumulative_backoff=None,
+      failure_mode=None,
       expansion_service=None,
   ):
     max_cumulative_backoff = int(
@@ -621,6 +641,7 @@ class SpannerUpdate(ExternalTransform):
                 emulator_host=emulator_host,
                 commit_deadline=commit_deadline,
                 max_cumulative_backoff=max_cumulative_backoff,
+                failure_mode=_get_enum_name(failure_mode),
             ),
         ),
         expansion_service=expansion_service or default_io_expansion_service(),
diff --git a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py 
b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
index 5d701052965..43a74f17053 100644
--- a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
@@ -234,6 +234,7 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
               database_id=self.database_id,
               project_id=self.project_id,
               table=self.table,
+              failure_mode=beam.io.gcp.spanner.FailureMode.REPORT_FAILURES,
               emulator_host=self.spanner_helper.get_emulator_host(),
           ))
 

Reply via email to