This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dcd984d  [BEAM-12164] Add Spanner Change Stream DAOs
     new 759da41  Merge pull request #16124 from [BEAM-12164] Add Spanner 
Change Stream DAOs
dcd984d is described below

commit dcd984d30707647346417988e8f202871aa782a1
Author: Thiago Nunes <[email protected]>
AuthorDate: Sat Dec 4 01:14:23 2021 +1100

    [BEAM-12164] Add Spanner Change Stream DAOs
    
    Adds the DAO classes to perform a change stream query in Cloud Spanner.
    The DAO class considers if the initial partition is given and uses a
    null partition token in that case.
---
 .../spanner/changestreams/dao/ChangeStreamDao.java | 120 ++++++++++++++++++
 .../changestreams/dao/ChangeStreamResultSet.java   | 137 +++++++++++++++++++++
 .../dao/ChangeStreamResultSetMetadata.java         |  92 ++++++++++++++
 .../{encoder => dao}/package-info.java             |  10 +-
 .../changestreams/encoder/package-info.java        |   3 +
 5 files changed, 360 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java
new file mode 100644
index 0000000..8ea1c30
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.Options.RpcPriority;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Statement;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+
+/**
+ * Responsible for making change stream queries for a given partition. The 
result will be given back
+ * as a {@link ResultSet}, which can be consumed until the stream is finished.
+ */
+public class ChangeStreamDao {
+
+  private static final String REQUEST_TAG = "change_stream";
+
+  private final String changeStreamName;
+  private final DatabaseClient databaseClient;
+  private final RpcPriority rpcPriority;
+  private final String jobName;
+
+  /**
+   * Constructs a change stream dao. All the queries performed by this class 
will be for the given
+   * change stream name with the specified rpc priority. The job name will be 
used to tag all the
+   * queries made.
+   *
+   * @param changeStreamName the name of the change stream to be queried
+   * @param databaseClient a spanner {@link DatabaseClient}
+   * @param rpcPriority the priority to be used for the change stream queries
+   * @param jobName the name of the job performing the query
+   */
+  ChangeStreamDao(
+      String changeStreamName,
+      DatabaseClient databaseClient,
+      RpcPriority rpcPriority,
+      String jobName) {
+    this.changeStreamName = changeStreamName;
+    this.databaseClient = databaseClient;
+    this.rpcPriority = rpcPriority;
+    this.jobName = jobName;
+  }
+
+  /**
+   * Performs a change stream query. If the partition token given is the 
initial partition null will
+   * be used in the query instead. The change stream query will be tagged as 
following: {@code
+   * "action=<REQUEST_TAG>, job=<jobName>"}. The result will be given as a 
{@link
+   * ChangeStreamResultSet} which can be consumed as a stream, yielding 
records until no more are
+   * available for the query made. Note that one needs to call {@link 
ChangeStreamResultSet#next()}
+   * to initiate the change stream query.
+   *
+   * @param partitionToken the unique partition token to be queried. If {@link
+   *     InitialPartition#PARTITION_TOKEN} is given, null will be used in the 
change stream query
+   *     instead.
+   * @param startTimestamp the inclusive start time for the change stream query
+   * @param endTimestamp the inclusive end time for the change stream query. 
Null can be provided to
+   *     indicate no end time is available
+   * @param heartbeatMillis the number of milliseconds after the stream is 
idle, which a heartbeat
+   *     record will be emitted in the change stream query
+   * @return a {@link ChangeStreamResultSet} that will produce a stream of 
records for the change
+   *     stream query
+   */
+  public ChangeStreamResultSet changeStreamQuery(
+      String partitionToken,
+      Timestamp startTimestamp,
+      @Nullable Timestamp endTimestamp,
+      long heartbeatMillis) {
+    // For the initial partition we query with a null partition token
+    final String partitionTokenOrNull =
+        InitialPartition.isInitialPartition(partitionToken) ? null : 
partitionToken;
+
+    final String query =
+        "SELECT * FROM READ_"
+            + changeStreamName
+            + "("
+            + "   start_timestamp => @startTimestamp,"
+            + "   end_timestamp => @endTimestamp,"
+            + "   partition_token => @partitionToken,"
+            + "   read_options => null,"
+            + "   heartbeat_milliseconds => @heartbeatMillis"
+            + ")";
+    final ResultSet resultSet =
+        databaseClient
+            .singleUse()
+            .executeQuery(
+                Statement.newBuilder(query)
+                    .bind("startTimestamp")
+                    .to(startTimestamp)
+                    .bind("endTimestamp")
+                    .to(endTimestamp)
+                    .bind("partitionToken")
+                    .to(partitionTokenOrNull)
+                    .bind("heartbeatMillis")
+                    .to(heartbeatMillis)
+                    .build(),
+                Options.priority(rpcPriority),
+                Options.tag("action=" + REQUEST_TAG + ",job=" + jobName));
+
+    return new ChangeStreamResultSet(resultSet);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
new file mode 100644
index 0000000..0ed690a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Struct;
+import org.joda.time.Duration;
+
+/**
+ * Decorator class over a {@link ResultSet} that provides telemetry for the 
streamed records. It
+ * will be returned for a change stream query. By using this class one can 
obtain the following
+ * metadata:
+ *
+ * <ul>
+ *   <li>The timestamp at which the query first started.
+ *   <li>The timestamp at which a record streaming started.
+ *   <li>The timestamp at which a record streaming ended.
+ *   <li>The timestamp at which a record was read by the caller.
+ *   <li>The total time for streaming all records within the query.
+ *   <li>The total number of records streamed within the query.
+ * </ul>
+ */
+public class ChangeStreamResultSet implements AutoCloseable {
+
+  private final ResultSet resultSet;
+  private Timestamp queryStartedAt;
+  private Timestamp recordStreamStartedAt;
+  private Timestamp recordStreamEndedAt;
+  private Timestamp recordReadAt;
+  private Duration totalStreamDuration;
+  private long numberOfRecordsRead;
+
+  /**
+   * Constructs a change stream result set on top of the received {@link 
ResultSet}.
+   *
+   * @param resultSet the {@link ResultSet} to be decorated
+   */
+  ChangeStreamResultSet(ResultSet resultSet) {
+    this.resultSet = resultSet;
+    this.queryStartedAt = Timestamp.MIN_VALUE;
+    this.recordStreamStartedAt = Timestamp.MIN_VALUE;
+    this.recordStreamEndedAt = Timestamp.MIN_VALUE;
+    this.recordReadAt = Timestamp.MIN_VALUE;
+    this.totalStreamDuration = Duration.ZERO;
+    this.numberOfRecordsRead = 0L;
+  }
+
+  /**
+   * Moves the pointer to the next record in the {@link ResultSet} if there is 
one. It also gathers
+   * metrics for the next record, such as:
+   *
+   * <ul>
+   *   <li>If this is the first record consumed, updates the time at which the 
query started.
+   *   <li>The timestamp at which a record streaming started.
+   *   <li>The timestamp at which a record streaming ended.
+   *   <li>Increments the total time for streaming the all the records so far.
+   *   <li>Increments the total number of all the records streamed so far.
+   * </ul>
+   *
+   * @return true if there is another record within the result set. Returns 
false otherwise.
+   */
+  public boolean next() {
+    if (queryStartedAt == null) {
+      queryStartedAt = Timestamp.now();
+    }
+    recordStreamStartedAt = Timestamp.now();
+    final boolean hasNext = resultSet.next();
+    numberOfRecordsRead++;
+    recordStreamEndedAt = Timestamp.now();
+    totalStreamDuration =
+        totalStreamDuration.withDurationAdded(
+            new Duration(
+                recordStreamStartedAt.toSqlTimestamp().getTime(),
+                recordStreamEndedAt.toSqlTimestamp().getTime()),
+            1);
+    return hasNext;
+  }
+
+  /**
+   * Returns the record at the current pointer as a {@link Struct}. It also 
updates the timestamp at
+   * which the record was read.
+   *
+   * <p>If {@link ChangeStreamResultSet#next()} was not called or if it was 
called but there are no
+   * more records in the stream, null will be returned.
+   *
+   * @return a change stream record as a {@link Struct} or null
+   */
+  public Struct getCurrentRowAsStruct() {
+    recordReadAt = Timestamp.now();
+    return resultSet.getCurrentRowAsStruct();
+  }
+
+  /**
+   * Returns the gathered metadata for the change stream query so far.
+   *
+   * @return a {@link ChangeStreamResultSetMetadata} contained telemetry 
information for the query
+   *     so far
+   */
+  public ChangeStreamResultSetMetadata getMetadata() {
+    return new ChangeStreamResultSetMetadata(
+        queryStartedAt,
+        recordStreamStartedAt,
+        recordStreamEndedAt,
+        recordReadAt,
+        totalStreamDuration,
+        numberOfRecordsRead);
+  }
+
+  /**
+   * Closes the current change stream {@link ResultSet}. The stream will be 
terminated when this
+   * method is called.
+   *
+   * <p>This method must always be called after the consumption of the result 
set or when an error
+   * occurs. This makes sure there are no Session leaks and all the underlying 
resources are
+   * released.
+   */
+  @Override
+  public void close() {
+    resultSet.close();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java
new file mode 100644
index 0000000..9ddc48e
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSetMetadata.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import com.google.cloud.Timestamp;
+import org.joda.time.Duration;
+
+/**
+ * Represents telemetry metadata gathered during the consumption of a change 
stream query. Within
+ * this class the caller will be able to retrieve the following information:
+ *
+ * <ul>
+ *   <li>The timestamp at which the query first started.
+ *   <li>The timestamp at which a record streaming started.
+ *   <li>The timestamp at which a record streaming ended.
+ *   <li>The timestamp at which a record was read by the caller.
+ *   <li>The total time for streaming all records within the query.
+ *   <li>The total number of records streamed within the query.
+ * </ul>
+ */
+public class ChangeStreamResultSetMetadata {
+  private final Timestamp queryStartedAt;
+  private final Timestamp recordStreamStartedAt;
+  private final Timestamp recordStreamEndedAt;
+  private final Timestamp recordReadAt;
+  private final Duration totalStreamDuration;
+  private final long numberOfRecordsRead;
+
+  /** Constructs a change stream result set metadata with the given telemetry 
information. */
+  ChangeStreamResultSetMetadata(
+      Timestamp queryStartedAt,
+      Timestamp recordStreamStartedAt,
+      Timestamp recordStreamEndedAt,
+      Timestamp recordReadAt,
+      Duration totalStreamDuration,
+      long numberOfRecordsRead) {
+    this.queryStartedAt = queryStartedAt;
+    this.recordStreamStartedAt = recordStreamStartedAt;
+    this.recordStreamEndedAt = recordStreamEndedAt;
+    this.recordReadAt = recordReadAt;
+    this.totalStreamDuration = totalStreamDuration;
+    this.numberOfRecordsRead = numberOfRecordsRead;
+  }
+
+  /**
+   * Returns the timestamp at which the change stream query for a {@link 
ChangeStreamResultSet}
+   * first started.
+   */
+  public Timestamp getQueryStartedAt() {
+    return queryStartedAt;
+  }
+
+  /** Returns the timestamp at which a record first started to be streamed. */
+  public Timestamp getRecordStreamStartedAt() {
+    return recordStreamStartedAt;
+  }
+
+  /** Returns the timestamp at which a record finished to be streamed. */
+  public Timestamp getRecordStreamEndedAt() {
+    return recordStreamEndedAt;
+  }
+
+  /** Returns the timestamp at which a record was read from the {@link 
ChangeStreamResultSet}. */
+  public Timestamp getRecordReadAt() {
+    return recordReadAt;
+  }
+
+  /** Returns the total stream duration of change stream records so far. */
+  public Duration getTotalStreamDuration() {
+    return totalStreamDuration;
+  }
+
+  /** Returns the total number of records read from the change stream so far. 
*/
+  public long getNumberOfRecordsRead() {
+    return numberOfRecordsRead;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/package-info.java
similarity index 76%
copy from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
copy to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/package-info.java
index 6cb7499..2169c46 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/package-info.java
@@ -16,5 +16,11 @@
  * limitations under the License.
  */
 
-/** User model for the Spanner change stream API. */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder;
+/**
+ * Database Access Objects for querying change streams and modifying the 
Connector's metadata
+ * tables.
+ */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
index 6cb7499..d5aca7d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/encoder/package-info.java
@@ -17,4 +17,7 @@
  */
 
 /** User model for the Spanner change stream API. */
+@Experimental
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder;
+
+import org.apache.beam.sdk.annotations.Experimental;

Reply via email to