This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 31ed331 Merge pull request #16977 from [BEAM-12164] Added
integration test for transaction boundaries and transaction ID ordering.
31ed331 is described below
commit 31ed3311711d64dc5588fea0e555122c4f2cbb4c
Author: nancyxu123 <[email protected]>
AuthorDate: Fri Mar 18 04:25:45 2022 +0000
Merge pull request #16977 from [BEAM-12164] Added integration test for
transaction boundaries and transaction ID ordering.
* Added integration test for transaction boundaries and transaction ID
ordering. Made small fixes in ordered by key integration test.
* [BEAM-9150] Fix beam_PostRelease_Python_Candidate (python RC validation
scripts) (#16955)
* Use default context output rather than outputWithTimestamp for
ElasticsearchIO
* Palo Alto case study - fix link
* [BEAM-12777] Removed current docs version redirect
* Merge pull request #16850: [BEAM-11205] Upgrade Libraries BOM
dependencies to 24.3.0
* Update GCP Libraries BOM version to 24.3.0
* Update associated dependencies
* Merge pull request #16484 from [BEAM-13633] [Playground] Implement method
to get a default example for each SDKs
* Implement method to get a default example for each SDKs
* Add error handling
* Added saving of precompiled objects catalog to cache at the server startup
* Added caching of the catalog only in case of unspecified SDK
* Update regarding comments
* Update regarding comments
* Simplified logging regarding comment
* Get defaultExamplePath from the corresponding config
* Refactoring code
* Add the `link` field to response
* Remove gjson;
Resolve conflicts;
* Refactoring code
* Getting default precompiled object from cache
* Refactoring code
* Added saving of precompiled objects catalog to cache at the server startup
* Added caching of the catalog only in case of unspecified SDK
* Update regarding comments
* Update regarding comments
* Simplified logging regarding comment
* Updates regarding comments
* Update for environment_service_test.go
* Get default example from catalog
* GetCatalogFromCacheOrStorage method
* Update licenses
* Update licenses;
Resolve conflicts;
* [BEAM-13633][Playground]
Change saving default precompiled objects to the cache
* [BEAM-13633][Playground]
Change logic of saving and receiving info about default precompiled objects
* [BEAM-13633][Playground]
Separate for each sdk
* [BEAM-13633][Playground]
regenerate proto files
* Add code of the default example to response
* Revert "Add code of the default example to response"
This reverts commit da6baa0aaa272190d4a035568a5e4db0b093dfd9.
* Refactoring code
* Refactoring code;
Add test;
* Edit commentaries
* Refactoring code
* Add bucket name to methods
Co-authored-by: Artur Khanin <[email protected]>
Co-authored-by: AydarZaynutdinov <[email protected]>
Co-authored-by: Pavel Avilov <pavel.avilov>
* Add 2022 events blog post (#16975)
* Clean up Go formatter suggestions (#16973)
* [BEAM-14012] Add go fmt to Github Actions (#16978)
* [BEAM-13911] Add basic tests to Go direct runner. (#16979)
* [BEAM-13960] Add support for more types when converting from between row
and proto (#16875)
* Adding schema support.
* Addressing feedback.
* Bump org.mongodb:mongo-java-driver to 3.12.10
* [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner
when FlinkRunner is used (#16904)
* [BEAM-13925] Turn pr bot on for go prs (#16984)
* [BEAM-13964] Bump kotlin to 1.6.x (#16882)
* [BEAM-13964] Bump kotlin to 1.6.x
* [BEAM-13964] Bump kotlin to 1.6.x
* [BEAM-13964] fix warnings in Kotlin compilation
* Skipping flaky sad-path tests for Spanner changestreams
* Merge pull request #16906: [BEAM-13974] Handle idle Storage Api streams
* Merge pull request #16562 from [BEAM-13051][D] Enable pylint warnings
(no-name-in-module/no-value-for-parameter)
* [BEAM-13051] Pylint no-name-in-module and no-value-for-parameter warnings
enabled
* [BEAM-13051] Fixed no-value-for-parameter warning for missing default
values
* [BEAM-13051] Fixed parameters warnings
* [BEAM-13925] A couple small pr-bot bug fixes (#16996)
* [BEAM-14029] Add getter, setter for target maven repo (#16995)
* [BEAM-13903] Improve coverage of metricsx package (#16994)
* [BEAM-13892] Improve coverage of avroio package (#16990)
* [adhoc] Prepare aws2 ClientConfiguration for json serialization and
cleanup AWS Module (#16894)
* [adhoc] Prepare aws2 ClientConfiguration and related classes for json
serialization and cleanup AWS Module
* Merge pull request #16879 from [BEAM-12164] Add javadocs to SpannerConfig
* Add tests and config for retry
* lint
* add tests
* lint
* Delete tests not passing
* Rebase on apache beam master
* review changes
* review changes
* add javadocs to SpannerConfig
* revert
* add full stops
* [Cleanup] Update pre-v2 go package references (#17002)
* [BEAM-13885] Add unit tests to window package (#16971)
* Merge pull request #16891 from [BEAM-13872] [Playground] Increase test
coverage for the code_processing package
* Increase test coverage for the code_processing package
* Refactoring code
* Add test cases with mock cache
* Add test for processCompileSuccess method
* Update test names
* Refactoring code
* Merge pull request #16912 from [BEAM-13878] [Playground] Increase test
coverage for the fs_tool package
* Increase test coverage for the fs_tool package
* Rename folder
* Remove useless variable
* Update test names
* Merge pull request #16946 from [BEAM-13873] [Playground] Increase test
coverage for the environment package
* Increase test coverage for the environment package
* Update test names
* Refactoring code
* Add bucket name to method
* [BEAM-13999] playground - support vertical orientation for graph
* [BEAM-13951] Update mass_comment.py list of Run commands (#16889)
* BEAM-13951: Sort run command list
* BEAM-13951: Update list
* fixup! BEAM-13951: Update list
* [BEAM-10652] Allow Clustering without Partition in BigQuery (#16578)
* [BEAM-10652] removed check that blocked clustering without partitioning
* [BEAM-10652] allow clustering without requiring partition
* newline
* added needed null
* remove testClusteringThrowsWithoutPartitioning
* update clustering
* formatting
* now compiles
* passes spotless
* update doc
* focus on single test
* spotless
* run all ITs
* spotless
* testing with time partitioning
* checking
* set clustering independant of partitioning
* remove timepart from it
* spotless
* removed test
* added TODO
* removed block of unneded code/comment
* remove override to v3 coder
* Spotless cleanup
* re-add override to v3 coder
* spotless
* adding checksum ( wrong value )
* added needed query var
* use tableName as var
* DATASET NAME
* project name in query
* update query
* change tests
* remove unneeded imports
* remove rest of forgotten
* add rows
* 16000 bytes
* bigint
* streaming test
* spotless
* methods
* end stream
* stream method and naming
* nostream
* streaming
* streamingoptions
* without streaming example
* string column instead of date -- related to BEAM-13753
* mor strings
* spotless
* revert, only DEFAULT and FILE_LOADS
* [BEAM-13857] Add K:V flags for expansion service jars and addresses to Go
ITs. (#16908)
Adds functionality for running jars to the Go integration test framework,
and uses this functionality to implement handling of K:V flags for providing
expansion service jars and addresses to the test framework. This means that
tests can simply get the address of an expansion service with the appropriate
label, and this feature will handle running a jar if necessary, or just using
the passed in endpoint otherwise.
* BEAM-14011 fix s3 filesystem multipart copy
* Merge pull request #16842 from [BEAM-13932][Playground] Container's user
privileges
* [BEAM-13932][Playground]
Change Dockerfiles
* [BEAM-13932][Playground]
Update proxy and permissions for the container's user
* [BEAM-13932][Playground]
Update permissions for the container's user for scio
* Doc updates and blog post for 2.37.0 (#16887)
* Doc updates and blog post for 2.37.0
* Add BEAM-13980 to known issues
* Update dates
* Drop known issue (fix cherrypicked)
* Add license
* Add missing #
* Remove resolved issue in docs + update class path on sample (#17018)
* [BEAM-14016] Fixed flaky postcommit test (#17009)
Fixed SpannerWriteIntegrationTest.test_spanner_update by fixing the
metric exporter usage in spannerio.
* [BEAM-13925] months in date constructor are 0 indexed
* [BEAM-13947] Add split() and rsplit(), non-deferred column operations on
categorical columns (#16677)
* Add split/rsplit; Need to refactor regex
* Support Regex; Refactor tests
* Remove debugger
* fix grammar
* Fix passing regex arg
* Reorder imports
* Address PR comments; Simplify kwargs
* Simplify getting columns for split_cat
* Update doctests to skip expand=True operations
* Fix missing doctest
* py: Import beam plugins before starting SdkHarness
* BEAM-14026 - Fixes bug related to Unnesting nested rows in an array
(#16988)
* Suggested changes to handle nested row in an array
* Beam-14026 Suggested changes to handle nested row in an array
* Beam-14026 Enhanced by segregating the code from getBaseValues enhanced
test case and example.
* Beam-14026 The code is moved from Row to avoid impact to the public
interface.
The code is moved to BeamUnnestRel.java since its the caller class.
The Example code was duplicate, hence dropped.
build.gradle updated with the removal of example code.
* Remove resolved issue in notebook
* Bump numpy bound to include 1.22 and regenerate container deps.
* [BEAM-13925] Add ability to get metrics on pr-bot performance (#16985)
* Add script to get metrics on pr-bot performance
* Respond to feedback
* fix bad condition
* [BEAM-11085] Test that windows are correctly observed in DoFns
* Give pr bot write permissions on pr update
* Adding a logical type for Schemas using proto serialization. (#16940)
* BEAM-13765 missing PAssert methods (#16668)
* [BEAM-13909] improve coverage of Provision package (#17014)
* improve coverage of provision package
* updated comments
* [BEAM-14050] Update taxi.go example instructions
* Merge pull request #17027: [BEAM-11205] Upgrade GCP Libraries BOM
dependencies to 24.4.0
* [BEAM-13709] Inconsistent behavior when parsing boolean flags across
different APIs in Python SDK (#16929)
* Update dataflow API client.
* Instructions for updating apitools generated files.
* [BEAM-10976] Bundle finalization: Harness and some exec changes (#16980)
* Bundle finalization harness side changes
* Add testing
* Iterate over pardos directly
* Track bundlefinalizer in plan.go not pardo
* Remove outdated test
* Fix pointer issue
* Update todos to reference jiras
* Cleanup from feedback
* Doc nit
Co-authored-by: Daniel Oliveira <[email protected]>
* GetExpirationTime comment
Co-authored-by: github-actions <[email protected]>
Co-authored-by: Daniel Oliveira <[email protected]>
* Merge pull request #16976 from [BEAM-14010] [Website] Add Playground
section to the Home page
* [BEAM-14010] [Website] Add Playground section to the Home page
* Update button to "Try Playground"
Co-authored-by: Aydar Zainutdinov <[email protected]>
* [BEAM-14010] [Website] change button name
* [BEAM-14010] [Website] align header to center
* [BEAM-14010] [Website] change link
Co-authored-by: Alex Kosolapov <[email protected]>
Co-authored-by: Aydar Zainutdinov <[email protected]>
* [BEAM-12447] Upgrade cloud build client and add/cleanup options (#17032)
* Merge pull request #17036 from [BEAM-12164] Convert all static instances
to be transient in the connector in order to enable concurrent testing
* Convert all static instances to be transient in the connector in order to
enable concurrent testing
* Initialized fields to null
* nullness
* Suppress uninitialized warnings
* Remove resetting dao factory fields in SpannerChangeStreamErrorTest.java
* Add validation package
* fix variable reference (#16991)
* Committed changes
* Print more logging
* More logging
* Made pipelines streaming
* Made small fixes
* Small fixes
* Ran spotless Apply
Co-authored-by: emily <[email protected]>
Co-authored-by: egalpin <[email protected]>
Co-authored-by: Aydar Farrakhov <[email protected]>
Co-authored-by: Miguel Hernandez <[email protected]>
Co-authored-by: Benjamin Gonzalez
<[email protected]>
Co-authored-by: Pavel Avilov <[email protected]>
Co-authored-by: Artur Khanin <[email protected]>
Co-authored-by: AydarZaynutdinov <[email protected]>
Co-authored-by: Ahmet Altay <[email protected]>
Co-authored-by: Jack McCluskey
<[email protected]>
Co-authored-by: Robert Burke <[email protected]>
Co-authored-by: laraschmidt <[email protected]>
Co-authored-by: Alexey Romanenko <[email protected]>
Co-authored-by: Victor <[email protected]>
Co-authored-by: Danny McCormick <[email protected]>
Co-authored-by: Masato Nakamura <[email protected]>
Co-authored-by: Pablo Estrada <[email protected]>
Co-authored-by: reuvenlax <[email protected]>
Co-authored-by: Miguel Hernandez
<[email protected]>
Co-authored-by: Moritz Mack <[email protected]>
Co-authored-by: Zoe <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
Co-authored-by: brucearctor <[email protected]>
Co-authored-by: Daniel Oliveira <[email protected]>
Co-authored-by: sp029619 <[email protected]>
Co-authored-by: David Cavazos <[email protected]>
Co-authored-by: Ning Kang <[email protected]>
Co-authored-by: github-actions <[email protected]>
Co-authored-by: Andy Ye <[email protected]>
Co-authored-by: Rahul Iyer <[email protected]>
Co-authored-by: abhijeet-lele
<[email protected]>
Co-authored-by: Valentyn Tymofieiev <[email protected]>
Co-authored-by: Marcin Kuthan <[email protected]>
Co-authored-by: Ritesh Ghorse <[email protected]>
Co-authored-by: Jack McCluskey <[email protected]>
Co-authored-by: ansh0l <[email protected]>
Co-authored-by: Anand Inguva <[email protected]>
Co-authored-by: Robert Bradshaw <[email protected]>
Co-authored-by: Daniel Oliveira <[email protected]>
Co-authored-by: bullet03 <[email protected]>
Co-authored-by: Alex Kosolapov <[email protected]>
Co-authored-by: Yichi Zhang <[email protected]>
Co-authored-by: Nancy Xu <[email protected]>
---
...StreamOrderedByTimestampAndTransactionIdIT.java | 582 +++++++++++++++++++++
...nnerChangeStreamOrderedWithinKeyGloballyIT.java | 203 ++-----
.../it/SpannerChangeStreamOrderedWithinKeyIT.java | 1 +
...SpannerChangeStreamTransactionBoundariesIT.java | 402 ++++++++++++++
4 files changed, 1039 insertions(+), 149 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java
new file mode 100644
index 0000000..0d70a06
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end test of Cloud Spanner Change Streams with strict commit
timestamp and transaction
+ * ordering.
+ */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamOrderedByTimestampAndTransactionIdIT {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.class);
+
+ @ClassRule public static final IntegrationTestEnv ENV = new
IntegrationTestEnv();
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private static String projectId;
+ private static String instanceId;
+ private static String databaseId;
+ private static String tableName;
+ private static String changeStreamName;
+ private static DatabaseClient databaseClient;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException, ExecutionException,
TimeoutException {
+ projectId = ENV.getProjectId();
+ instanceId = ENV.getInstanceId();
+ databaseId = ENV.getDatabaseId();
+ tableName = ENV.createSingersTable();
+ changeStreamName = ENV.createChangeStreamFor(tableName);
+ databaseClient = ENV.getDatabaseClient();
+ }
+
+ @Test
+ public void testTransactionBoundaries() {
+ final SpannerConfig spannerConfig =
+ SpannerConfig.create()
+ .withProjectId(projectId)
+ .withInstanceId(instanceId)
+ .withDatabaseId(databaseId);
+ // Commit a initial transaction to get the timestamp to start reading from.
+ List<Mutation> mutations = new ArrayList<>();
+ mutations.add(insertRecordMutation(0, "FirstName0", "LastName0"));
+ final long timeIncrementInSeconds = 2;
+ final Timestamp startTimestamp = databaseClient.write(mutations);
+ writeTransactionsToDatabase();
+
+ // Sleep the time increment interval.
+ try {
+ Thread.sleep(timeIncrementInSeconds * 1000);
+ } catch (InterruptedException e) {
+ LOG.error(e.toString(), e);
+ }
+
+ // This will be the second batch of transactions that will have strict
timestamp ordering
+ // per key.
+ writeTransactionsToDatabase();
+
+ // Sleep the time increment interval.
+ try {
+ Thread.sleep(timeIncrementInSeconds * 1000);
+ } catch (InterruptedException e) {
+ LOG.error(e.toString(), e);
+ }
+
+ // This will be the final batch of transactions that will have strict
timestamp ordering
+ // per key.
+ com.google.cloud.Timestamp endTimestamp = writeTransactionsToDatabase();
+
+ final PCollection<String> tokens =
+ pipeline
+ .apply(
+ SpannerIO.readChangeStream()
+ .withSpannerConfig(spannerConfig)
+ .withChangeStreamName(changeStreamName)
+ .withMetadataDatabase(databaseId)
+ .withInclusiveStartAt(startTimestamp)
+ .withInclusiveEndAt(endTimestamp))
+ .apply(
+ ParDo.of(
+ new
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.KeyBySortKeyFn()))
+ .apply(
+ ParDo.of(
+ new SpannerChangeStreamOrderedByTimestampAndTransactionIdIT
+ .CreateArtificialKeyFn()))
+ .apply(
+ ParDo.of(
+ new BufferRecordsUntilOutputTimestamp(endTimestamp,
timeIncrementInSeconds)))
+ .apply(
+ ParDo.of(new
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.ToStringFn()));
+
+ // Assert that the returned PCollection contains all six transactions (in
string representation)
+ // and that each transaction contains, in order, the list of mutations
added.
+ PAssert.that(tokens)
+ .containsInAnyOrder(
+ // Insert Singer 0 into the table.
+ "{\"SingerId\":\"0\"},INSERT\n"
+
+ // Insert Singer 1 and 2 into the table,
+ + "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"
+
+ // Delete Singer 1 and Insert Singer 3 into the table.
+ + "{\"SingerId\":\"1\"},DELETE\n"
+ + "{\"SingerId\":\"3\"},INSERT\n"
+
+ // Delete Singers 2 and 3.
+ + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"
+
+ // Delete Singer 0.
+ + "{\"SingerId\":\"0\"},DELETE\n",
+
+ // Second batch of transactions.
+ // Insert Singer 1 and 2 into the table,
+ "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"
+
+ // Delete Singer 1 and Insert Singer 3 into the table.
+ + "{\"SingerId\":\"1\"},DELETE\n"
+ + "{\"SingerId\":\"3\"},INSERT\n"
+
+ // Delete Singers 2 and 3.
+ + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n",
+
+ // Third batch of transactions.
+ // Insert Singer 1 and 2 into the table,
+ "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n"
+
+ // Delete Singer 1 and Insert Singer 3 into the table.
+ + "{\"SingerId\":\"1\"},DELETE\n"
+ + "{\"SingerId\":\"3\"},INSERT\n"
+
+ // Delete Singers 2 and 3.
+ + "{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n");
+
+ pipeline
+ .runWithAdditionalOptionArgs(Collections.singletonList("--streaming"))
+ .waitUntilFinish();
+ }
+
+ // KeyByTransactionIdFn takes in a DataChangeRecord and outputs a key-value
pair of
+ // {SortKey, DataChangeRecord}
+ private static class KeyBySortKeyFn
+ extends DoFn<
+ DataChangeRecord,
+ KV<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
DataChangeRecord>> {
+
+ private static final long serialVersionUID = 1270485392415293532L;
+
+ @ProcessElement
+ public void processElement(
+ @Element DataChangeRecord record,
+ OutputReceiver<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>
+ outputReceiver) {
+ outputReceiver.output(
+ KV.of(
+ new
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey(
+ record.getCommitTimestamp(),
record.getServerTransactionId()),
+ record));
+ }
+ }
+
+ // CreateArtificialKeyFn keys each input element by an artifical byte key.
This is because buffers
+ // and timers are per key and window, and we want to buffer all data change
records in a time
+ // interval, rather than buffer per key.
+ private static class CreateArtificialKeyFn
+ extends DoFn<
+ KV<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
DataChangeRecord>,
+ KV<
+ byte[],
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>> {
+ private static final long serialVersionUID = -3363057370822294686L;
+
+ @ProcessElement
+ public void processElement(
+ @Element
+
KV<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
DataChangeRecord>
+ element,
+ OutputReceiver<
+ KV<
+ byte[],
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>>
+ outputReceiver) {
+ outputReceiver.output(KV.of(new byte[0], element));
+ }
+ }
+
+ // Timers and buffers are per-key.
+ // Buffer each data change record until the watermark passes the timestamp
at which we want
+ // to output the buffered data change records.
+ // We utilize a looping timer to determine when to flush the buffer:
+ //
+ // 1. When we see a data change record for the first time (i.e. no data
change records in
+ // the buffer), we will set the timer to fire at an interval after the
data change record's
+ // timestamp.
+ // 2. Then, when the timer fires, if the current timer's expiration time is
before the pipeline
+ // end time, if set, we still have data left to process. We will set the
next timer to the
+ // current timer's expiration time plus incrementIntervalInSeconds.
+ // 3. Otherwise, we will not set a timer.
+ //
+ private static class BufferRecordsUntilOutputTimestamp
+ extends DoFn<
+ KV<
+ byte[],
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>,
+ Iterable<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>> {
+ private static final long serialVersionUID = 5050535558953049259L;
+
+ private final long incrementIntervalInSeconds;
+ private final @Nullable Instant pipelineEndTime;
+
+ private BufferRecordsUntilOutputTimestamp(
+ @Nullable com.google.cloud.Timestamp endTimestamp, long
incrementIntervalInSeconds) {
+ this.incrementIntervalInSeconds = incrementIntervalInSeconds;
+ if (endTimestamp != null) {
+ this.pipelineEndTime = new Instant(endTimestamp.toSqlTimestamp());
+ } else {
+ pipelineEndTime = null;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @TimerId("timer")
+ private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @SuppressWarnings("unused")
+ @StateId("buffer")
+ private final StateSpec<
+ BagState<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>>
+ buffer = StateSpecs.bag();
+
+ @SuppressWarnings("unused")
+ @StateId("keySeen")
+ private final StateSpec<ValueState<Boolean>> keySeen =
StateSpecs.value(BooleanCoder.of());
+
+ @ProcessElement
+ public void process(
+ @Element
+ KV<
+ byte[],
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>
+ element,
+ @StateId("buffer")
+ BagState<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>
+ buffer,
+ @TimerId("timer") Timer timer,
+ @StateId("keySeen") ValueState<Boolean> keySeen) {
+ buffer.add(element.getValue());
+
+ // Only set the timer if this is the first time we are receiving a data
change record
+ // with this key.
+ Boolean hasKeyBeenSeen = keySeen.read();
+ if (hasKeyBeenSeen == null) {
+ Instant commitTimestamp =
+ new
Instant(element.getValue().getKey().getCommitTimestamp().toSqlTimestamp());
+ Instant outputTimestamp =
+
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
+ LOG.debug("Setting timer at {} for key {}",
outputTimestamp.toString(), element.getKey());
+ timer.set(outputTimestamp);
+ keySeen.write(true);
+ }
+ }
+
+ @OnTimer("timer")
+ public void onExpiry(
+ OnTimerContext context,
+ @StateId("buffer")
+ BagState<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>
+ buffer,
+ @TimerId("timer") Timer timer) {
+ if (!buffer.isEmpty().read()) {
+ final List<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>
+ records =
+ StreamSupport.stream(buffer.read().spliterator(), false)
+ .collect(Collectors.toList());
+ buffer.clear();
+
+
List<KV<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
DataChangeRecord>>
+ recordsToOutput = new ArrayList<>();
+ for
(KV<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
DataChangeRecord>
+ record : records) {
+ Instant recordCommitTimestamp =
+ new
Instant(record.getKey().getCommitTimestamp().toSqlTimestamp());
+ // When the watermark passes time T, this means that all records
with event time < T
+ // have been processed and successfully committed. Since the timer
fires when the
+ // watermark passes the expiration time, we should only output
records with event time
+ // < expiration time.
+ final String recordString = getRecordString(record.getValue());
+ if (recordCommitTimestamp.isBefore(context.timestamp())) {
+ LOG.debug(
+ "Outputting transactions {} with id {} at expiration timestamp
{}",
+ recordString,
+ record.getKey().toString(),
+ context.timestamp().toString());
+ recordsToOutput.add(record);
+ } else {
+ LOG.debug(
+ "Expired at {} but adding transaction {} back to buffer "
+ + "due to commit timestamp {}",
+ context.timestamp().toString(),
+ recordString,
+ recordCommitTimestamp.toString());
+ buffer.add(record);
+ }
+ }
+
+ // Output records, if there are any to output.
+ if (!recordsToOutput.isEmpty()) {
+ context.outputWithTimestamp(recordsToOutput, context.timestamp());
+ LOG.debug(
+ "Expired at {}, outputting records for key {}",
+ context.timestamp().toString(),
+ recordsToOutput.get(0).getKey().toString());
+ } else {
+ LOG.debug("Expired at {} with no records",
context.timestamp().toString());
+ }
+ }
+
+ Instant nextTimer =
+
context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
+ if (pipelineEndTime == null ||
context.timestamp().isBefore(pipelineEndTime)) {
+ // If the current timer's timestamp is before the pipeline end time,
or there is no
+ // pipeline end time, we still have data left to process.
+ LOG.debug("Setting next timer to {}", nextTimer.toString());
+ timer.set(nextTimer);
+ } else {
+ LOG.debug(
+ "Timer not being set as exceeded pipeline end time: " +
pipelineEndTime.toString());
+ }
+ }
+ }
+
+ // ToStringFn takes in a list of key-value pairs of SortKey,
Iterable<DataChangeRecord> and
+ // outputs a string representation.
+ private static class ToStringFn
+ extends DoFn<
+ Iterable<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>,
+ String> {
+
+ private static final long serialVersionUID = 2307936669684679038L;
+
+ @ProcessElement
+ public void processElement(
+ @Element
+ Iterable<
+ KV<
+
SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
+ DataChangeRecord>>
+ element,
+ OutputReceiver<String> outputReceiver) {
+ final StringBuilder builder = new StringBuilder();
+
+ List<KV<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey,
DataChangeRecord>>
+ sortedTransactions =
+ StreamSupport.stream(element.spliterator(), false)
+ .sorted((kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey()))
+ .collect(Collectors.toList());
+
+ sortedTransactions.forEach(
+ record -> {
+ builder.append(getRecordString(record.getValue()));
+ });
+ outputReceiver.output(builder.toString());
+ }
+ }
+
+ // Get a string representation of the mods and the mod type in the data
change record.
+ private static String getRecordString(DataChangeRecord record) {
+ final StringBuilder builder = new StringBuilder();
+ String modString = "";
+ for (Mod mod : record.getMods()) {
+ modString += mod.getKeysJson();
+ }
+ builder.append(String.join(",", modString,
record.getModType().toString()));
+ builder.append("\n");
+ return builder.toString();
+ }
+
+ private Timestamp writeTransactionsToDatabase() {
+ List<Mutation> mutations = new ArrayList<>();
+
+ // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table.
+ mutations.add(insertRecordMutation(1, "FirstName1", "LastName2"));
+ mutations.add(insertRecordMutation(2, "FirstName2", "LastName2"));
+ Timestamp t1 = databaseClient.write(mutations);
+ LOG.debug("The first transaction committed with timestamp: " +
t1.toString());
+ mutations.clear();
+
+ // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from
the table.
+ mutations.add(insertRecordMutation(3, "FirstName3", "LastName3"));
+ mutations.add(deleteRecordMutation(1));
+ Timestamp t2 = databaseClient.write(mutations);
+ LOG.debug("The second transaction committed with timestamp: " +
t2.toString());
+ mutations.clear();
+
+ // 3. Commit a transaction to delete Singer 2 and Singer 3 from the table.
+ mutations.add(deleteRecordMutation(2));
+ mutations.add(deleteRecordMutation(3));
+ Timestamp t3 = databaseClient.write(mutations);
+ LOG.debug("The third transaction committed with timestamp: " +
t3.toString());
+ mutations.clear();
+
+ // 4. Commit a transaction to delete Singer 0.
+ mutations.add(deleteRecordMutation(0));
+ Timestamp t4 = databaseClient.write(mutations);
+ LOG.debug("The fourth transaction committed with timestamp: " +
t4.toString());
+ return t4;
+ }
+
+ // Create an insert mutation.
+ private static Mutation insertRecordMutation(long singerId, String
firstName, String lastName) {
+ return Mutation.newInsertBuilder(tableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to(firstName)
+ .set("LastName")
+ .to(lastName)
+ .build();
+ }
+
+ // Create a delete mutation.
+ private static Mutation deleteRecordMutation(long singerId) {
+ return Mutation.delete(tableName,
KeySet.newBuilder().addKey(Key.of(singerId)).build());
+ }
+
+ private static class SortKey
+ implements Serializable,
+
Comparable<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey> {
+
+ private static final long serialVersionUID = 2105939115467195036L;
+
+ private Timestamp commitTimestamp;
+ private String transactionId;
+
+ public SortKey() {}
+
+ public SortKey(Timestamp commitTimestamp, String transactionId) {
+ this.commitTimestamp = commitTimestamp;
+ this.transactionId = transactionId;
+ }
+
+ public static long getSerialVersionUID() {
+ return serialVersionUID;
+ }
+
+ public Timestamp getCommitTimestamp() {
+ return commitTimestamp;
+ }
+
+ public void setCommitTimestamp(Timestamp commitTimestamp) {
+ this.commitTimestamp = commitTimestamp;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey sortKey =
+ (SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey) o;
+ return Objects.equals(commitTimestamp, sortKey.commitTimestamp)
+ && Objects.equals(transactionId, sortKey.transactionId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(commitTimestamp, transactionId);
+ }
+
+ @Override
+ public int
compareTo(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey
other) {
+ return Comparator
+
.<SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.SortKey>comparingDouble(
+ sortKey ->
+ sortKey.getCommitTimestamp().getSeconds()
+ + sortKey.getCommitTimestamp().getNanos() / 1000000000.0)
+ .thenComparing(sortKey -> sortKey.getTransactionId())
+ .compare(this, other);
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java
index fc7ff71..78f742f 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.java
@@ -31,11 +31,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
-import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
@@ -98,7 +96,7 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
.withDatabaseId(databaseId);
// Get the time increment interval at which to flush data changes ordered
by key.
- final long timeIncrementInSeconds = 70;
+ final long timeIncrementInSeconds = 2;
// Commit a initial transaction to get the timestamp to start reading from.
List<Mutation> mutations = new ArrayList<>();
@@ -113,7 +111,7 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
try {
Thread.sleep(timeIncrementInSeconds * 1000);
} catch (InterruptedException e) {
- System.out.println(e);
+ LOG.error(e.toString(), e);
}
// This will be the second batch of transactions that will have strict
timestamp ordering
@@ -124,14 +122,14 @@ public class
SpannerChangeStreamOrderedWithinKeyGloballyIT {
try {
Thread.sleep(timeIncrementInSeconds * 1000);
} catch (InterruptedException e) {
- System.out.println(e);
+ LOG.error(e.toString(), e);
}
// This will be the final batch of transactions that will have strict
timestamp ordering
// per key.
com.google.cloud.Timestamp endTimestamp = writeTransactionsToDatabase();
- LOG.debug(
+ LOG.info(
"Reading change streams from {} to {}", startTimestamp.toString(),
endTimestamp.toString());
final PCollection<String> tokens =
@@ -146,8 +144,7 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new
KeyValueByCommitTimestampAndTransactionIdFn<>()))
- .apply(
- ParDo.of(new BufferKeyUntilOutputTimestamp(endTimestamp,
timeIncrementInSeconds)))
+ .apply(ParDo.of(new
BufferKeyUntilOutputTimestamp(timeIncrementInSeconds)))
.apply(ParDo.of(new ToStringFn()));
// Assert that the returned PCollection contains one entry per key for the
committed
@@ -164,73 +161,39 @@ public class
SpannerChangeStreamOrderedWithinKeyGloballyIT {
+ "Deleted record;",
"{\"SingerId\":\"1\"}\n"
+ "{\"FirstName\":\"Inserting mutation
1\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 1\"};"
- + "Deleted record;"
- + "{\"FirstName\":\"Inserting mutation
1\",\"LastName\":null,\"SingerInfo\":null};"
+ "Deleted record;",
"{\"SingerId\":\"2\"}\n"
+ "{\"FirstName\":\"Inserting mutation
2\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 2\"};"
+ "Deleted record;",
"{\"SingerId\":\"3\"}\n"
+ "{\"FirstName\":\"Inserting mutation
3\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 3\"};"
- + "Deleted record;",
- "{\"SingerId\":\"4\"}\n"
- + "{\"FirstName\":\"Inserting mutation
4\",\"LastName\":null,\"SingerInfo\":null};"
- + "Deleted record;",
- "{\"SingerId\":\"5\"}\n"
- + "{\"FirstName\":\"Updating mutation
5\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 5\"};"
+ "Deleted record;",
// Second batch of records ordered within key.
"{\"SingerId\":\"1\"}\n"
+ "{\"FirstName\":\"Inserting mutation
1\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 1\"};"
- + "Deleted record;"
- + "{\"FirstName\":\"Inserting mutation
1\",\"LastName\":null,\"SingerInfo\":null};"
+ "Deleted record;",
"{\"SingerId\":\"2\"}\n"
+ "{\"FirstName\":\"Inserting mutation
2\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 2\"};"
+ "Deleted record;",
"{\"SingerId\":\"3\"}\n"
+ "{\"FirstName\":\"Inserting mutation
3\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 3\"};"
- + "Deleted record;",
- "{\"SingerId\":\"4\"}\n"
- + "{\"FirstName\":\"Inserting mutation
4\",\"LastName\":null,\"SingerInfo\":null};"
- + "Deleted record;",
- "{\"SingerId\":\"5\"}\n"
- + "{\"FirstName\":\"Updating mutation
5\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 5\"};"
+ "Deleted record;",
// Third batch of records ordered within key.
"{\"SingerId\":\"1\"}\n"
+ "{\"FirstName\":\"Inserting mutation
1\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 1\"};"
- + "Deleted record;"
- + "{\"FirstName\":\"Inserting mutation
1\",\"LastName\":null,\"SingerInfo\":null};"
+ "Deleted record;",
"{\"SingerId\":\"2\"}\n"
+ "{\"FirstName\":\"Inserting mutation
2\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 2\"};"
+ "Deleted record;",
"{\"SingerId\":\"3\"}\n"
+ "{\"FirstName\":\"Inserting mutation
3\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 3\"};"
- + "Deleted record;",
- "{\"SingerId\":\"4\"}\n"
- + "{\"FirstName\":\"Inserting mutation
4\",\"LastName\":null,\"SingerInfo\":null};"
- + "Deleted record;",
- "{\"SingerId\":\"5\"}\n"
- + "{\"FirstName\":\"Updating mutation
5\",\"LastName\":null,\"SingerInfo\":null};"
- + "{\"FirstName\":\"Updating mutation 5\"};"
+ "Deleted record;");
- pipeline.run().waitUntilFinish();
+ pipeline
+ .runWithAdditionalOptionArgs(Collections.singletonList("--streaming"))
+ .waitUntilFinish();
}
// Data change records may contain multiple mods if there are multiple
primary keys.
@@ -241,22 +204,6 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT
{
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
- final ChangeStreamRecordMetadata fakeChangeStreamMetadata =
- ChangeStreamRecordMetadata.newBuilder()
- .withPartitionToken("1")
-
.withRecordTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(2L))
-
.withPartitionStartTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(3L))
-
.withPartitionEndTimestamp(com.google.cloud.Timestamp.ofTimeMicroseconds(4L))
-
.withPartitionCreatedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(5L))
-
.withPartitionScheduledAt(com.google.cloud.Timestamp.ofTimeMicroseconds(6L))
-
.withPartitionRunningAt(com.google.cloud.Timestamp.ofTimeMicroseconds(7L))
-
.withQueryStartedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(8L))
-
.withRecordStreamStartedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(9L))
-
.withRecordStreamEndedAt(com.google.cloud.Timestamp.ofTimeMicroseconds(10L))
-
.withRecordReadAt(com.google.cloud.Timestamp.ofTimeMicroseconds(11L))
- .withTotalStreamTimeMillis(12L)
- .withNumberOfRecordsRead(13L)
- .build();
record.getMods().stream()
.map(
mod ->
@@ -273,7 +220,7 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
- fakeChangeStreamMetadata))
+ record.getMetadata()))
.forEach(outputReceiver::output);
}
}
@@ -332,16 +279,9 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT
{
private static final long serialVersionUID = 5050535558953049259L;
private final long incrementIntervalInSeconds;
- private final @Nullable Instant pipelineEndTime;
- private BufferKeyUntilOutputTimestamp(
- @Nullable com.google.cloud.Timestamp endTimestamp, long
incrementIntervalInSeconds) {
+ private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
- if (endTimestamp != null) {
- this.pipelineEndTime = new Instant(endTimestamp.toSqlTimestamp());
- } else {
- pipelineEndTime = null;
- }
}
@SuppressWarnings("unused")
@@ -355,8 +295,8 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
buffer = StateSpecs.bag();
@SuppressWarnings("unused")
- @StateId("keySeen")
- private final StateSpec<ValueState<Boolean>> keySeen =
StateSpecs.value(BooleanCoder.of());
+ @StateId("seenKey")
+ private final StateSpec<ValueState<String>> seenKey =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@@ -367,20 +307,20 @@ public class
SpannerChangeStreamOrderedWithinKeyGloballyIT {
BagState<KV<SpannerChangeStreamOrderedWithinKeyGloballyIT.SortKey,
DataChangeRecord>>
buffer,
@TimerId("timer") Timer timer,
- @StateId("keySeen") ValueState<Boolean> keySeen) {
+ @StateId("seenKey") ValueState<String> seenKey) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data
change record
// with this key.
- Boolean hasKeyBeenSeen = keySeen.read();
+ String hasKeyBeenSeen = seenKey.read();
if (hasKeyBeenSeen == null) {
Instant commitTimestamp =
new
Instant(element.getValue().getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
- LOG.debug("Setting timer at {} for key {}",
outputTimestamp.toString(), element.getKey());
+ LOG.info("Setting timer at {} for key {}", outputTimestamp.toString(),
element.getKey());
timer.set(outputTimestamp);
- keySeen.write(true);
+ seenKey.write(element.getKey());
}
}
@@ -390,7 +330,14 @@ public class SpannerChangeStreamOrderedWithinKeyGloballyIT
{
@StateId("buffer")
BagState<KV<SpannerChangeStreamOrderedWithinKeyGloballyIT.SortKey,
DataChangeRecord>>
buffer,
- @TimerId("timer") Timer timer) {
+ @TimerId("timer") Timer timer,
+ @StateId("seenKey") ValueState<String> seenKey) {
+ String keyForTimer = seenKey.read();
+ Instant timerContextTimestamp = context.timestamp();
+ LOG.info(
+ "Timer reached expiration time for key {} and for timestamp {}",
+ keyForTimer,
+ timerContextTimestamp);
if (!buffer.isEmpty().read()) {
final List<KV<SpannerChangeStreamOrderedWithinKeyGloballyIT.SortKey,
DataChangeRecord>>
records =
@@ -412,18 +359,18 @@ public class
SpannerChangeStreamOrderedWithinKeyGloballyIT {
// have been processed and successfully committed. Since the timer
fires when the
// watermark passes the expiration time, we should only output
records with event time
// < expiration time.
- if (recordCommitTimestamp.isBefore(context.timestamp())) {
- LOG.debug(
+ if (recordCommitTimestamp.isBefore(timerContextTimestamp)) {
+ LOG.info(
"Outputting record with key {} and value \"{}\" at expiration
timestamp {}",
record.getValue().getMods().get(0).getKeysJson(),
recordString,
- context.timestamp().toString());
+ timerContextTimestamp.toString());
recordsToOutput.add(record);
} else {
- LOG.debug(
+ LOG.info(
"Expired at {} but adding record with key {} and value {} back
to buffer "
+ "due to commit timestamp {}",
- context.timestamp().toString(),
+ timerContextTimestamp.toString(),
record.getValue().getMods().get(0).getKeysJson(),
recordString,
recordCommitTimestamp.toString());
@@ -437,26 +384,24 @@ public class
SpannerChangeStreamOrderedWithinKeyGloballyIT {
KV.of(
recordsToOutput.get(0).getValue().getMods().get(0).getKeysJson(),
recordsToOutput),
- context.timestamp());
- LOG.debug(
- "Expired at {}, outputting records for key {}",
- context.timestamp().toString(),
+ timerContextTimestamp);
+ LOG.info(
+ "Expired at {}, outputting records for key and context timestamp
{}",
+ timerContextTimestamp.toString(),
recordsToOutput.get(0).getValue().getMods().get(0).getKeysJson());
} else {
- LOG.debug("Expired at {} with no records",
context.timestamp().toString());
+ LOG.info("Expired at {} with no records",
timerContextTimestamp.toString());
}
}
Instant nextTimer =
-
context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
- if (pipelineEndTime == null ||
context.timestamp().isBefore(pipelineEndTime)) {
- // If the current timer's timestamp is before the pipeline end time,
or there is no
- // pipeline end time, we still have data left to process.
- LOG.debug("Setting next timer to {}", nextTimer.toString());
+
timerContextTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
+ if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
+ LOG.info("Setting next timer to {} for key {}", nextTimer.toString(),
keyForTimer);
timer.set(nextTimer);
} else {
- LOG.debug(
- "Timer not being set as exceeded pipeline end time: " +
pipelineEndTime.toString());
+ LOG.info("Timer not being set since the buffer is empty for key {} ",
keyForTimer);
+ seenKey.clear();
}
}
}
@@ -546,75 +491,35 @@ public class
SpannerChangeStreamOrderedWithinKeyGloballyIT {
}
}
- private static com.google.cloud.Timestamp writeTransactionsToDatabase() {
+ private com.google.cloud.Timestamp writeTransactionsToDatabase() {
List<Mutation> mutations = new ArrayList<>();
// 1. Commit a transaction to insert Singer 1 and Singer 2 into the table.
mutations.add(insertRecordMutation(1));
mutations.add(insertRecordMutation(2));
com.google.cloud.Timestamp t1 = databaseClient.write(mutations);
- LOG.debug("The first transaction committed with timestamp: " +
t1.toString());
- mutations.clear();
-
- // 2. Commmit a transaction to insert Singer 4 and remove Singer 1 from
the table.
- mutations.add(updateRecordMutation(1));
- mutations.add(insertRecordMutation(4));
- com.google.cloud.Timestamp t2 = databaseClient.write(mutations);
- LOG.debug("The second transaction committed with timestamp: " +
t2.toString());
+ LOG.info("The first transaction committed with timestamp: " +
t1.toString());
mutations.clear();
- // 3. Commit a transaction to insert Singer 3 and Singer 5.
- mutations.add(deleteRecordMutation(1));
+ // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from
the table.
mutations.add(insertRecordMutation(3));
- mutations.add(insertRecordMutation(5));
- mutations.add(updateRecordMutation(5));
- com.google.cloud.Timestamp t3 = databaseClient.write(mutations);
- LOG.debug("The third transaction committed with timestamp: " +
t3.toString());
- mutations.clear();
-
- // 4. Commit a transaction to update Singer 3 and Singer 2 in the table.
- mutations.add(updateRecordMutation(3));
- mutations.add(updateRecordMutation(2));
- com.google.cloud.Timestamp t4 = databaseClient.write(mutations);
- LOG.debug("The fourth transaction committed with timestamp: " +
t4.toString());
+ mutations.add(deleteRecordMutation(1));
+ com.google.cloud.Timestamp t2 = databaseClient.write(mutations);
+ LOG.info("The second transaction committed with timestamp: " +
t2.toString());
mutations.clear();
- // 5. Commit a transaction to delete 4, insert 1, delete 3, update 5.
- mutations.add(deleteRecordMutation(4));
- mutations.add(insertRecordMutation(1));
+ // 3. Commit a transaction to delete Singer 2 and Singer 3 from the table.
+ mutations.add(deleteRecordMutation(2));
mutations.add(deleteRecordMutation(3));
- mutations.add(updateRecordMutation(5));
- com.google.cloud.Timestamp t5 = databaseClient.write(mutations);
-
- LOG.debug("The fifth transaction committed with timestamp: " +
t5.toString());
- mutations.clear();
-
- // 6. Commit a transaction to delete Singers 5, insert singers 6.
- mutations.add(deleteRecordMutation(5));
- mutations.add(insertRecordMutation(6));
- mutations.add(deleteRecordMutation(6));
- com.google.cloud.Timestamp t6 = databaseClient.write(mutations);
- LOG.debug("The sixth transaction committed with timestamp: " +
t6.toString());
+ com.google.cloud.Timestamp t3 = databaseClient.write(mutations);
+ LOG.info("The third transaction committed with timestamp: " +
t3.toString());
mutations.clear();
- // 7. Delete remaining rows from database.
- mutations.add(deleteRecordMutation(1));
- mutations.add(deleteRecordMutation(2));
+ // 4. Commit a transaction to delete Singer 0.
mutations.add(deleteRecordMutation(0));
- com.google.cloud.Timestamp t7 = databaseClient.write(mutations);
- LOG.debug("The seventh transaction committed with timestamp: " +
t7.toString());
-
- return t7;
- }
-
- // Create an update mutation.
- private static Mutation updateRecordMutation(long singerId) {
- return Mutation.newUpdateBuilder(tableName)
- .set("SingerId")
- .to(singerId)
- .set("FirstName")
- .to("Updating mutation " + singerId)
- .build();
+ com.google.cloud.Timestamp t4 = databaseClient.write(mutations);
+ LOG.info("The fourth transaction committed with timestamp: " +
t4.toString());
+ return t4;
}
// Create an insert mutation.
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java
index 53566f0..9758cad 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyIT.java
@@ -83,6 +83,7 @@ public class SpannerChangeStreamOrderedWithinKeyIT {
@Test
public void testOrderedWithinKey() {
+ LOG.info("Test pipeline: " + pipeline.toString());
final SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(projectId)
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
new file mode 100644
index 0000000..eb5b9e3
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** End-to-end test of Cloud Spanner Change Streams Transaction Boundaries. */
+@RunWith(JUnit4.class)
+public class SpannerChangeStreamTransactionBoundariesIT {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(SpannerChangeStreamTransactionBoundariesIT.class);
+
+ @ClassRule public static final IntegrationTestEnv ENV = new
IntegrationTestEnv();
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private static String projectId;
+ private static String instanceId;
+ private static String databaseId;
+ private static String tableName;
+ private static String changeStreamName;
+ private static DatabaseClient databaseClient;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException, ExecutionException,
TimeoutException {
+ projectId = ENV.getProjectId();
+ instanceId = ENV.getInstanceId();
+ databaseId = ENV.getDatabaseId();
+ tableName = ENV.createSingersTable();
+ changeStreamName = ENV.createChangeStreamFor(tableName);
+ databaseClient = ENV.getDatabaseClient();
+ }
+
+ @Test
+ public void testTransactionBoundaries() {
+ LOG.info("Test pipeline: " + pipeline.toString());
+ final SpannerConfig spannerConfig =
+ SpannerConfig.create()
+ .withProjectId(projectId)
+ .withInstanceId(instanceId)
+ .withDatabaseId(databaseId);
+
+ // Commit a initial transaction to get the timestamp to start reading from.
+ List<Mutation> mutations = new ArrayList<>();
+ mutations.add(insertRecordMutation(0, "FirstName0", "LastName0"));
+ final Timestamp startTimestamp = databaseClient.write(mutations);
+
+ // Get the timestamp of the last committed transaction to get the end
timestamp.
+ final Timestamp endTimestamp = writeTransactionsToDatabase();
+
+ final PCollection<String> tokens =
+ pipeline
+ .apply(
+ SpannerIO.readChangeStream()
+ .withSpannerConfig(spannerConfig)
+ .withChangeStreamName(changeStreamName)
+ .withMetadataDatabase(databaseId)
+ .withInclusiveStartAt(startTimestamp)
+ .withInclusiveEndAt(endTimestamp))
+ .apply(ParDo.of(new
SpannerChangeStreamTransactionBoundariesIT.KeyByTransactionIdFn()))
+ .apply(ParDo.of(new
SpannerChangeStreamTransactionBoundariesIT.TransactionBoundaryFn()))
+ .apply(ParDo.of(new
SpannerChangeStreamTransactionBoundariesIT.ToStringFn()));
+
+ // Assert that the returned PCollection contains all six transactions (in
string representation)
+ // and that each transaction contains, in order, the list of mutations
added.
+ PAssert.that(tokens)
+ .containsInAnyOrder(
+ // Insert Singer 0 into the table.
+ "{\"SingerId\":\"0\"},INSERT\n",
+
+ // Insert Singer 1 and 2 into the table,
+ "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n",
+
+ // Delete Singer 1 and Insert Singer 3 into the table.
+ "{\"SingerId\":\"1\"},DELETE\n" + "{\"SingerId\":\"3\"},INSERT\n",
+
+ // Insert Singers 4, 5, 6 into the table.
+
"{\"SingerId\":\"4\"}{\"SingerId\":\"5\"}{\"SingerId\":\"6\"},INSERT\n",
+
+ // Update Singer 6 and Insert Singer 7
+ "{\"SingerId\":\"6\"},UPDATE\n" + "{\"SingerId\":\"7\"},INSERT\n",
+
+ // Update Singers 4 and 5 in the table.
+ "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},UPDATE\n",
+
+ // Delete Singers 3, 4, 5 from the table.
+
"{\"SingerId\":\"3\"}{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},DELETE\n",
+
+ // Delete Singers 0, 2, 6, 7;
+ "{\"SingerId\":\"0\"}{\"SingerId\":\"2\"}{\"SingerId\":\"6\"}"
+ + "{\"SingerId\":\"7\"},DELETE\n");
+
+ final PipelineResult pipelineResult = pipeline.run();
+ pipelineResult.waitUntilFinish();
+ }
+
+ // KeyByTransactionIdFn takes in a DataChangeRecord and outputs a key-value
pair of
+ // {TransactionId, DataChangeRecord}
+ private static class KeyByTransactionIdFn
+ extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
+
+ private static final long serialVersionUID = 1270485392415293532L;
+
+ @ProcessElement
+ public void processElement(
+ @Element DataChangeRecord record,
+ OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
+ outputReceiver.output(KV.of(record.getServerTransactionId(), record));
+ }
+ }
+
+ // TransactionBoundaryFn buffers received key-value pairs of {TransactionId,
DataChangeRecord}
+ // from KeyByTransactionIdFn and buffers them in groups based on
TransactionId.
+ // When the number of records buffered is equal to the number of records
contained in the
+ // entire transaction, this function sorts the DataChangeRecords in the
group by record sequence
+ // and outputs a key-value pair of SortKey(CommitTimestamp, TransactionId),
+ // Iterable<DataChangeRecord>.
+ private static class TransactionBoundaryFn
+ extends DoFn<
+ KV<String, DataChangeRecord>,
+ KV<SpannerChangeStreamTransactionBoundariesIT.SortKey,
Iterable<DataChangeRecord>>> {
+
+ private static final long serialVersionUID = 5050535558953049259L;
+
+ @SuppressWarnings("UnusedVariable")
+ @StateId("buffer")
+ private final StateSpec<BagState<DataChangeRecord>> buffer =
StateSpecs.bag();
+
+ @SuppressWarnings("UnusedVariable")
+ @StateId("count")
+ private final StateSpec<ValueState<Integer>> countState =
StateSpecs.value();
+
+ @ProcessElement
+ public void process(
+ ProcessContext context,
+ @StateId("buffer") BagState<DataChangeRecord> buffer,
+ @StateId("count") ValueState<Integer> countState) {
+ final KV<String, DataChangeRecord> element = context.element();
+ final DataChangeRecord record = element.getValue();
+
+ buffer.add(record);
+ int count = (countState.read() != null ? countState.read() : 0);
+ count = count + 1;
+ countState.write(count);
+
+ if (count == record.getNumberOfRecordsInTransaction()) {
+ final List<DataChangeRecord> sortedRecords =
+ StreamSupport.stream(buffer.read().spliterator(), false)
+
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
+ .collect(Collectors.toList());
+
+ final Instant commitInstant =
+ new
Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp().getTime());
+ context.outputWithTimestamp(
+ KV.of(
+ new SpannerChangeStreamTransactionBoundariesIT.SortKey(
+ sortedRecords.get(0).getCommitTimestamp(),
+ sortedRecords.get(0).getServerTransactionId()),
+ sortedRecords),
+ commitInstant);
+ buffer.clear();
+ countState.clear();
+ }
+ }
+ }
+
+ // ToStringFn takes in a key-value pair of SortKey,
Iterable<DataChangeRecord> and outputs
+ // a string representation.
+ private static class ToStringFn
+ extends DoFn<
+ KV<SpannerChangeStreamTransactionBoundariesIT.SortKey,
Iterable<DataChangeRecord>>,
+ String> {
+
+ private static final long serialVersionUID = 2307936669684679038L;
+
+ @ProcessElement
+ public void processElement(
+ @Element
+ KV<SpannerChangeStreamTransactionBoundariesIT.SortKey,
Iterable<DataChangeRecord>>
+ element,
+ OutputReceiver<String> outputReceiver) {
+ final StringBuilder builder = new StringBuilder();
+ final Iterable<DataChangeRecord> sortedRecords = element.getValue();
+ sortedRecords.forEach(
+ record -> {
+ // Output the string representation of the mods and the mod type
for each data change
+ // record.
+ String modString = "";
+ for (Mod mod : record.getMods()) {
+ modString += mod.getKeysJson();
+ }
+ builder.append(String.join(",", modString,
record.getModType().toString()));
+ builder.append("\n");
+ });
+ outputReceiver.output(builder.toString());
+ }
+ }
+
+ private static class SortKey
+ implements Serializable,
Comparable<SpannerChangeStreamTransactionBoundariesIT.SortKey> {
+
+ private static final long serialVersionUID = 2105939115467195036L;
+
+ private Timestamp commitTimestamp;
+ private String transactionId;
+
+ public SortKey() {}
+
+ public SortKey(Timestamp commitTimestamp, String transactionId) {
+ this.commitTimestamp = commitTimestamp;
+ this.transactionId = transactionId;
+ }
+
+ public static long getSerialVersionUID() {
+ return serialVersionUID;
+ }
+
+ public Timestamp getCommitTimestamp() {
+ return commitTimestamp;
+ }
+
+ public void setCommitTimestamp(Timestamp commitTimestamp) {
+ this.commitTimestamp = commitTimestamp;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SpannerChangeStreamTransactionBoundariesIT.SortKey sortKey =
+ (SpannerChangeStreamTransactionBoundariesIT.SortKey) o;
+ return Objects.equals(commitTimestamp, sortKey.commitTimestamp)
+ && Objects.equals(transactionId, sortKey.transactionId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(commitTimestamp, transactionId);
+ }
+
+ @Override
+ public int compareTo(SpannerChangeStreamTransactionBoundariesIT.SortKey
other) {
+ return
Comparator.<SpannerChangeStreamTransactionBoundariesIT.SortKey>comparingDouble(
+ sortKey ->
+ sortKey.getCommitTimestamp().getSeconds()
+ + sortKey.getCommitTimestamp().getNanos() / 1000000000.0)
+ .thenComparing(sortKey -> sortKey.getTransactionId())
+ .compare(this, other);
+ }
+ }
+
+ private Timestamp writeTransactionsToDatabase() {
+ List<Mutation> mutations = new ArrayList<>();
+
+ // 1. Commit a transaction to insert Singer 1 and Singer 2 into the table.
+ mutations.add(insertRecordMutation(1, "FirstName1", "LastName2"));
+ mutations.add(insertRecordMutation(2, "FirstName2", "LastName2"));
+ Timestamp t1 = databaseClient.write(mutations);
+ LOG.debug("The first transaction committed with timestamp: " +
t1.toString());
+ mutations.clear();
+
+ // 2. Commmit a transaction to insert Singer 3 and remove Singer 1 from
the table.
+ mutations.add(insertRecordMutation(3, "FirstName3", "LastName3"));
+ mutations.add(deleteRecordMutation(1));
+ Timestamp t2 = databaseClient.write(mutations);
+ LOG.debug("The second transaction committed with timestamp: " +
t2.toString());
+ mutations.clear();
+
+ // 3. Commit a transaction to insert Singer 4 and Singer 5 and Singer 6
into the table.
+ mutations.add(insertRecordMutation(4, "FirstName4", "LastName4"));
+ mutations.add(insertRecordMutation(5, "FirstName5", "LastName5"));
+ mutations.add(insertRecordMutation(6, "FirstName6", "LastName6"));
+ Timestamp t3 = databaseClient.write(mutations);
+ LOG.debug("The third transaction committed with timestamp: " +
t3.toString());
+ mutations.clear();
+
+ // 4. Commit a transaction to insert Singer 7 and update Singer 6 in the
table.
+ mutations.add(insertRecordMutation(7, "FirstName7", "LastName7"));
+ mutations.add(updateRecordMutation(6, "FirstName5", "LastName5"));
+ Timestamp t4 = databaseClient.write(mutations);
+ LOG.debug("The fourth transaction committed with timestamp: " +
t4.toString());
+ mutations.clear();
+
+ // 5. Commit a transaction to update Singer 4 and Singer 5 in the table.
+ mutations.add(updateRecordMutation(4, "FirstName9", "LastName9"));
+ mutations.add(updateRecordMutation(5, "FirstName9", "LastName9"));
+ Timestamp t5 = databaseClient.write(mutations);
+ LOG.debug("The fifth transaction committed with timestamp: " +
t5.toString());
+ mutations.clear();
+
+ // 6. Commit a transaction to delete Singers 3, 4, 5.
+ mutations.add(deleteRecordMutation(3));
+ mutations.add(deleteRecordMutation(4));
+ mutations.add(deleteRecordMutation(5));
+ Timestamp t6 = databaseClient.write(mutations);
+ mutations.clear();
+ LOG.debug("The sixth transaction committed with timestamp: " +
t6.toString());
+
+ // 7. Commit a transaction to delete Singers 0, 2, 6, 7.
+ mutations.add(deleteRecordMutation(0));
+ mutations.add(deleteRecordMutation(2));
+ mutations.add(deleteRecordMutation(6));
+ mutations.add(deleteRecordMutation(7));
+ Timestamp t7 = databaseClient.write(mutations);
+ LOG.debug("The seventh transaction committed with timestamp: " +
t7.toString());
+
+ return t7;
+ }
+
+ // Create an update mutation.
+ private static Mutation updateRecordMutation(long singerId, String
firstName, String lastName) {
+ return Mutation.newUpdateBuilder(tableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to(firstName)
+ .set("LastName")
+ .to(lastName)
+ .build();
+ }
+
+ // Create an insert mutation.
+ private static Mutation insertRecordMutation(long singerId, String
firstName, String lastName) {
+ return Mutation.newInsertBuilder(tableName)
+ .set("SingerId")
+ .to(singerId)
+ .set("FirstName")
+ .to(firstName)
+ .set("LastName")
+ .to(lastName)
+ .build();
+ }
+
+ // Create a delete mutation.
+ private static Mutation deleteRecordMutation(long singerId) {
+ return Mutation.delete(tableName,
KeySet.newBuilder().addKey(Key.of(singerId)).build());
+ }
+}