This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dcd984d [BEAM-12164] Add Spanner Change Stream DAOs
new 759da41 Merge pull request #16124 from [BEAM-12164] Add Spanner
Change Stream DAOs
dcd984d is described below
commit dcd984d30707647346417988e8f202871aa782a1
Author: Thiago Nunes <[email protected]>
AuthorDate: Sat Dec 4 01:14:23 2021 +1100
[BEAM-12164] Add Spanner Change Stream DAOs
Adds the DAO classes to perform a change stream query in Cloud Spanner.
The DAO class considers if the initial partition is given and uses a
null partition token in that case.
---
.../spanner/changestreams/dao/ChangeStreamDao.java | 120 ++++++++++++++++++
.../changestreams/dao/ChangeStreamResultSet.java | 137 +++++++++++++++++++++
.../dao/ChangeStreamResultSetMetadata.java | 92 ++++++++++++++
.../{encoder => dao}/package-info.java | 10 +-
.../changestreams/encoder/package-info.java | 3 +
5 files changed, 360 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java
new file mode 100644
index 0000000..8ea1c30
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.Options.RpcPriority;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Statement;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+
+/**
+ * Responsible for making change stream queries for a given partition. The
result will be given back
+ * as a {@link ResultSet}, which can be consumed until the stream is finished.
+ */
+public class ChangeStreamDao {
+
+ private static final String REQUEST_TAG = "change_stream";
+
+ private final String changeStreamName;
+ private final DatabaseClient databaseClient;
+ private final RpcPriority rpcPriority;
+ private final String jobName;
+
+ /**
+ * Constructs a change stream dao. All the queries performed by this class
will be for the given
+ * change stream name with the specified rpc priority. The job name will be
used to tag all the
+ * queries made.
+ *
+ * @param changeStreamName the name of the change stream to be queried
+ * @param databaseClient a spanner {@link DatabaseClient}
+ * @param rpcPriority the priority to be used for the change stream queries
+ * @param jobName the name of the job performing the query
+ */
+ ChangeStreamDao(
+ String changeStreamName,
+ DatabaseClient databaseClient,
+ RpcPriority rpcPriority,
+ String jobName) {
+ this.changeStreamName = changeStreamName;
+ this.databaseClient = databaseClient;
+ this.rpcPriority = rpcPriority;
+ this.jobName = jobName;
+ }
+
+ /**
+ * Performs a change stream query. If the partition token given is the
initial partition null will
+ * be used in the query instead. The change stream query will be tagged as
following: {@code
+ * "action=<REQUEST_TAG>, job=<jobName>"}. The result will be given as a
{@link
+ * ChangeStreamResultSet} which can be consumed as a stream, yielding
records until no more are
+ * available for the query made. Note that one needs to call {@link
ChangeStreamResultSet#next()}
+ * to initiate the change stream query.
+ *
+ * @param partitionToken the unique partition token to be queried. If {@link
+ * InitialPartition#PARTITION_TOKEN} is given, null will be used in the
change stream query
+ * instead.
+ * @param startTimestamp the inclusive start time for the change stream query
+ * @param endTimestamp the inclusive end time for the change stream query.
Null can be provided to
+ * indicate no end time is available
+ * @param heartbeatMillis the number of milliseconds after the stream is
idle, which a heartbeat
+ * record will be emitted in the change stream query
+ * @return a {@link ChangeStreamResultSet} that will produce a stream of
records for the change
+ * stream query
+ */
+ public ChangeStreamResultSet changeStreamQuery(
+ String partitionToken,
+ Timestamp startTimestamp,
+ @Nullable Timestamp endTimestamp,
+ long heartbeatMillis) {
+ // For the initial partition we query with a null partition token
+ final String partitionTokenOrNull =
+ InitialPartition.isInitialPartition(partitionToken) ? null :
partitionToken;
+
+ final String query =
+ "SELECT * FROM READ_"
+ + changeStreamName
+ + "("
+ + " start_timestamp => @startTimestamp,"
+ + " end_timestamp => @endTimestamp,"
+ + " partition_token => @partitionToken,"
+ + " read_options => null,"
+ + " heartbeat_milliseconds => @heartbeatMillis"
+ + ")";
+ final ResultSet resultSet =
+ databaseClient
+ .singleUse()
+ .executeQuery(
+ Statement.newBuilder(query)
+ .bind("startTimestamp")
+ .to(startTimestamp)
+ .bind("endTimestamp")
+ .to(endTimestamp)
+ .bind("partitionToken")
+ .to(partitionTokenOrNull)
+ .bind("heartbeatMillis")
+ .to(heartbeatMillis)
+ .build(),
+ Options.priority(rpcPriority),
+ Options.tag("action=" + REQUEST_TAG + ",job=" + jobName));
+
+ return new ChangeStreamResultSet(resultSet);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
new file mode 100644
index 0000000..0ed690a
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Struct;
+import org.joda.time.Duration;
+
+/**
+ * Decorator class over a {@link ResultSet} that provides telemetry for the
streamed records. It
+ * will be returned for a change stream query. By using this class one can
obtain the following
+ * metadata:
+ *
+ * <ul>
+ * <li>The timestamp at which the query first started.
+ * <li>The timestamp at which a record streaming started.
+ * <li>The timestamp at which a record streaming ended.
+ * <li>The timestamp at which a record was read by the caller.
+ * <li>The total time for streaming all records within the query.
+ * <li>The total number of records streamed within the query.
+ * </ul>
+ */
+public class ChangeStreamResultSet implements AutoCloseable {
+
+ private final ResultSet resultSet;
+ private Timestamp queryStartedAt;
+ private Timestamp recordStreamStartedAt;
+ private Timestamp recordStreamEndedAt;
+ private Timestamp recordReadAt;
+ private Duration totalStreamDuration;
+ private long numberOfRecordsRead;
+
+ /**
+ * Constructs a change stream result set on top of the received {@link
ResultSet}.
+ *
+ * @param resultSet the {@link ResultSet} to be decorated
+ */
+ ChangeStreamResultSet(ResultSet resultSet) {
+ this.resultSet = resultSet;
+ this.queryStartedAt = Timestamp.MIN_VALUE;
+ this.recordStreamStartedAt = Timestamp.MIN_VALUE;
+ this.recordStreamEndedAt = Timestamp.MIN_VALUE;
+ this.recordReadAt = Timestamp.MIN_VALUE;
+ this.totalStreamDuration = Duration.ZERO;
+ this.numberOfRecordsRead = 0L;
+ }
+
+ /**
+ * Moves the pointer to the next record in the {@link ResultSet} if there is
one. It also gathers
+ * metrics for the next record, such as:
+ *
+ * <ul>
+ * <li>If this is the first record consumed, updates the time at which the
query started.
+ * <li>The timestamp at which a record streaming started.
+ * <li>The timestamp at which a record streaming ended.
+ * <li>Increments the total time for streaming the all the records so far.
+ * <li>Increments the total number of all the records streamed so far.
+ * </ul>
+ *
+ * @return true if there is another record within the result set. Returns
false otherwise.
+ */
+ public boolean next() {
+ if (queryStartedAt == null) {
+ queryStartedAt = Timestamp.now();
+ }
+ recordStreamStartedAt = Timestamp.now();
+ final boolean hasNext = resultSet.next();
+ numberOfRecordsRead++;
+ recordStreamEndedAt = Timestamp.now();
+ totalStreamDuration =
+ totalStreamDuration.withDurationAdded(
+ new Duration(
+ recordStreamStartedAt.toSqlTimestamp().getTime(),
+ recordStreamEndedAt.toSqlTimestamp().getTime()),
+ 1);
+ return hasNext;
+ }
+
+ /**
+ * Returns the record at the current pointer as a {@link Struct}. It also
updates the timestamp at
+ * which the record was read.
+ *
+ * <p>If {@link ChangeStreamResultSet#next()} was not called or if it was
called but there are no
+ * more records in the stream, null will be returned.
+ *
+ * @return a change stream record as a {@link Struct} or null
+ */
+ public Struct getCurrentRowAsStruct() {
+ recordReadAt = Timestamp.now();
+ return resultSet.getCurrentRowAsStruct();
+ }
+
+ /**
+ * Returns the gathered metadata for the change stream query so far.
+ *
+ * @return a {@link ChangeStreamResultSetMetadata} contained telemetry
information for the query
+ * so far
+ */
+ public ChangeStreamResultSetMetadata getMetadata() {
+ return new ChangeStreamResultSetMetadata(
+ queryStartedAt,
+ recordStreamStartedAt,
+ recordStreamEndedAt,
+ recordReadAt,
+ totalStreamDuration,
+ numberOfRecordsRead);
+ }
+
+ /**
+ * Closes the current change stream {@link ResultSet}. The stream will be
terminated when this
+ * method is called.
+ *
+ * <p>This method must always be called after the consumption of the result
set or when an error
+ * occurs. This makes sure there are no Session leaks and all the underlying
resources are
+ * released.
+ */
+ @Override
+ public void close() {
+ resultSet.close();
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java
new file mode 100644
index 0000000..9ddc48e
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import com.google.cloud.Timestamp;
+import org.joda.time.Duration;
+
+/**
+ * Represents telemetry metadata gathered during the consumption of a change
stream query. Within
+ * this class the caller will be able to retrieve the following information:
+ *
+ * <ul>
+ * <li>The timestamp at which the query first started.
+ * <li>The timestamp at which a record streaming started.
+ * <li>The timestamp at which a record streaming ended.
+ * <li>The timestamp at which a record was read by the caller.
+ * <li>The total time for streaming all records within the query.
+ * <li>The total number of records streamed within the query.
+ * </ul>
+ */
+public class ChangeStreamResultSetMetadata {
+ private final Timestamp queryStartedAt;
+ private final Timestamp recordStreamStartedAt;
+ private final Timestamp recordStreamEndedAt;
+ private final Timestamp recordReadAt;
+ private final Duration totalStreamDuration;
+ private final long numberOfRecordsRead;
+
+ /** Constructs a change stream result set metadata with the given telemetry
information. */
+ ChangeStreamResultSetMetadata(
+ Timestamp queryStartedAt,
+ Timestamp recordStreamStartedAt,
+ Timestamp recordStreamEndedAt,
+ Timestamp recordReadAt,
+ Duration totalStreamDuration,
+ long numberOfRecordsRead) {
+ this.queryStartedAt = queryStartedAt;
+ this.recordStreamStartedAt = recordStreamStartedAt;
+ this.recordStreamEndedAt = recordStreamEndedAt;
+ this.recordReadAt = recordReadAt;
+ this.totalStreamDuration = totalStreamDuration;
+ this.numberOfRecordsRead = numberOfRecordsRead;
+ }
+
+ /**
+ * Returns the timestamp at which the change stream query for a {@link
ChangeStreamResultSet}
+ * first started.
+ */
+ public Timestamp getQueryStartedAt() {
+ return queryStartedAt;
+ }
+
+ /** Returns the timestamp at which a record first started to be streamed. */
+ public Timestamp getRecordStreamStartedAt() {
+ return recordStreamStartedAt;
+ }
+
+ /** Returns the timestamp at which a record finished to be streamed. */
+ public Timestamp getRecordStreamEndedAt() {
+ return recordStreamEndedAt;
+ }
+
+ /** Returns the timestamp at which a record was read from the {@link
ChangeStreamResultSet}. */
+ public Timestamp getRecordReadAt() {
+ return recordReadAt;
+ }
+
+ /** Returns the total stream duration of change stream records so far. */
+ public Duration getTotalStreamDuration() {
+ return totalStreamDuration;
+ }
+
+ /** Returns the total number of records read from the change stream so far.
*/
+ public long getNumberOfRecordsRead() {
+ return numberOfRecordsRead;
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/package-info.java
similarity index 76%
copy from
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
copy to
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/package-info.java
index 6cb7499..2169c46 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/package-info.java
@@ -16,5 +16,11 @@
* limitations under the License.
*/
-/** User model for the Spanner change stream API. */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder;
+/**
+ * Database Access Objects for querying change streams and modifying the
Connector's metadata
+ * tables.
+ */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
index 6cb7499..d5aca7d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
@@ -17,4 +17,7 @@
*/
/** User model for the Spanner change stream API. */
+@Experimental
package org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder;
+
+import org.apache.beam.sdk.annotations.Experimental;