This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6593a0f15c1 Adding failure_mode parameter to Spanner Python transforms
(#29529)
6593a0f15c1 is described below
commit 6593a0f15c1a3cc9813059100425d5716eba0193
Author: Pablo Estrada <[email protected]>
AuthorDate: Sat Dec 2 08:13:54 2023 -0800
Adding failure_mode parameter to Spanner Python transforms (#29529)
* Adding failure_mode parameter to Spanner Python transforms
* fix formatting
* Improve pydoc
* fixup docs
---
.../io/gcp/spanner/SpannerTransformRegistrar.java | 10 ++++++++++
sdks/python/apache_beam/io/gcp/spanner.py | 21 +++++++++++++++++++++
.../io/gcp/tests/xlang_spannerio_it_test.py | 1 +
3 files changed, 32 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
index 38cd97da860..809d7a27551 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
@@ -288,6 +288,7 @@ public class SpannerTransformRegistrar implements
ExternalTransformRegistrar {
private @Nullable Integer groupingFactor;
private @Nullable Duration commitDeadline;
private @Nullable Duration maxCumulativeBackoff;
+ private @Nullable String failureMode;
public void setTable(String table) {
this.table = table;
@@ -322,6 +323,10 @@ public class SpannerTransformRegistrar implements
ExternalTransformRegistrar {
this.maxCumulativeBackoff =
Duration.standardSeconds(maxCumulativeBackoff);
}
}
+
+ public void setFailureMode(@Nullable String failureMode) {
+ this.failureMode = failureMode;
+ }
}
@Override
@@ -361,6 +366,11 @@ public class SpannerTransformRegistrar implements
ExternalTransformRegistrar {
writeTransform =
writeTransform.withMaxCumulativeBackoff(configuration.maxCumulativeBackoff);
}
+ if (configuration.failureMode != null) {
+ writeTransform =
+ writeTransform.withFailureMode(
+ SpannerIO.FailureMode.valueOf(configuration.failureMode));
+ }
return SpannerIO.WriteRows.of(writeTransform, operation,
configuration.table);
}
}
diff --git a/sdks/python/apache_beam/io/gcp/spanner.py
b/sdks/python/apache_beam/io/gcp/spanner.py
index c16daa4448b..51c7fc65c17 100644
--- a/sdks/python/apache_beam/io/gcp/spanner.py
+++ b/sdks/python/apache_beam/io/gcp/spanner.py
@@ -124,6 +124,11 @@ class TimestampBoundMode(Enum):
STRONG = auto()
+class FailureMode(Enum):
+ FAIL_FAST = auto()
+ REPORT_FAILURES = auto()
+
+
class ReadFromSpannerSchema(NamedTuple):
instance_id: str
database_id: str
@@ -282,6 +287,7 @@ class WriteToSpannerSchema(NamedTuple):
emulator_host: Optional[str]
commit_deadline: Optional[int]
max_cumulative_backoff: Optional[int]
+ failure_mode: Optional[str]
_CLASS_DOC = \
@@ -346,6 +352,11 @@ _INIT_DOC = \
(15min). If the mutations still have not been written after this time,
they are treated as a failure, and handled according to the setting of
failure_mode. Pass seconds as value.
+ :param failure_mode: Specifies the behavior for mutations that fail to be
+ written to Spanner. Default is FAIL_FAST. When FAIL_FAST is set,
+ an exception will be thrown for any failed mutation. When REPORT_FAILURES
+ is set, processing will continue instead of throwing an exception. Note
+ that REPORT_FAILURES can cause data loss if used incorrectly.
:param expansion_service: The address (host:port) of the ExpansionService.
"""
@@ -392,6 +403,7 @@ class SpannerDelete(ExternalTransform):
emulator_host=None,
commit_deadline=None,
max_cumulative_backoff=None,
+ failure_mode=None,
expansion_service=None,
):
max_cumulative_backoff = int(
@@ -413,6 +425,7 @@ class SpannerDelete(ExternalTransform):
emulator_host=emulator_host,
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
+ failure_mode=_get_enum_name(failure_mode),
),
),
expansion_service=expansion_service or default_io_expansion_service(),
@@ -445,6 +458,7 @@ class SpannerInsert(ExternalTransform):
commit_deadline=None,
max_cumulative_backoff=None,
expansion_service=None,
+ failure_mode=None,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
@@ -465,6 +479,7 @@ class SpannerInsert(ExternalTransform):
emulator_host=emulator_host,
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
+ failure_mode=_get_enum_name(failure_mode),
),
),
expansion_service=expansion_service or default_io_expansion_service(),
@@ -497,6 +512,7 @@ class SpannerReplace(ExternalTransform):
commit_deadline=None,
max_cumulative_backoff=None,
expansion_service=None,
+ failure_mode=None,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
@@ -517,6 +533,7 @@ class SpannerReplace(ExternalTransform):
emulator_host=emulator_host,
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
+ failure_mode=_get_enum_name(failure_mode),
),
),
expansion_service=expansion_service or default_io_expansion_service(),
@@ -548,6 +565,7 @@ class SpannerInsertOrUpdate(ExternalTransform):
emulator_host=None,
commit_deadline=None,
max_cumulative_backoff=None,
+ failure_mode=None,
expansion_service=None,
):
max_cumulative_backoff = int(
@@ -569,6 +587,7 @@ class SpannerInsertOrUpdate(ExternalTransform):
emulator_host=emulator_host,
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
+ failure_mode=_get_enum_name(failure_mode),
),
),
expansion_service=expansion_service or default_io_expansion_service(),
@@ -600,6 +619,7 @@ class SpannerUpdate(ExternalTransform):
emulator_host=None,
commit_deadline=None,
max_cumulative_backoff=None,
+ failure_mode=None,
expansion_service=None,
):
max_cumulative_backoff = int(
@@ -621,6 +641,7 @@ class SpannerUpdate(ExternalTransform):
emulator_host=emulator_host,
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
+ failure_mode=_get_enum_name(failure_mode),
),
),
expansion_service=expansion_service or default_io_expansion_service(),
diff --git a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
index 5d701052965..43a74f17053 100644
--- a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py
@@ -234,6 +234,7 @@ class CrossLanguageSpannerIOTest(unittest.TestCase):
database_id=self.database_id,
project_id=self.project_id,
table=self.table,
+ failure_mode=beam.io.gcp.spanner.FailureMode.REPORT_FAILURES,
emulator_host=self.spanner_helper.get_emulator_host(),
))