This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
     new 331b40117ea Not including support for end time in Bigtable Change 
Stream connector (#25474)
331b40117ea is described below

commit 331b40117eac3981ea319265bad18739e1dce297
Author: Tony Tang <[email protected]>
AuthorDate: Wed Feb 15 11:48:49 2023 -0500

    Not including support for end time in Bigtable Change Stream connector 
(#25474)
---
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 20 ++---------------
 .../changestreams/action/ActionFactory.java        | 10 +++------
 .../action/DetectNewPartitionsAction.java          | 16 -------------
 .../action/GenerateInitialPartitionsAction.java    |  9 ++------
 .../action/ReadChangeStreamPartitionAction.java    | 26 ++++++----------------
 .../changestreams/dao/ChangeStreamDao.java         |  8 +------
 .../dofn/DetectNewPartitionsDoFn.java              | 12 +++-------
 .../changestreams/model/PartitionRecord.java       | 25 ++-------------------
 .../action/DetectNewPartitionsActionTest.java      |  5 +----
 .../GenerateInitialPartitionsActionTest.java       |  4 +---
 10 files changed, 22 insertions(+), 113 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d2eff9cbbfa..00048b73d08 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -222,8 +222,7 @@ import org.slf4j.LoggerFactory;
  *            .withInstanceId(instanceId)
  *            .withTableId(tableId)
  *            .withAppProfileId(appProfileId)
- *            .withStartTime(startTime)
- *            .withEndTime(endTime));
+ *            .withStartTime(startTime));
  * }</pre>
  *
  * <h3>Permissions</h3>
@@ -281,7 +280,6 @@ public class BigtableIO {
    *
    * <ul>
    *   <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to 
now.
-   *   <li>{@link BigtableIO.ReadChangeStream#withEndTime} which defaults to 
empty.
    *   <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with 
defaults to 1 seconds.
    *   <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} 
which defaults to value
    *       from {@link BigtableIO.ReadChangeStream#withProjectId}
@@ -1544,8 +1542,6 @@ public class BigtableIO {
 
     abstract @Nullable Timestamp getStartTime();
 
-    abstract @Nullable Timestamp getEndTime();
-
     abstract @Nullable Duration getHeartbeatDuration();
 
     abstract @Nullable String getChangeStreamName();
@@ -1657,16 +1653,6 @@ public class BigtableIO {
       return toBuilder().setStartTime(startTime).build();
     }
 
-    /**
-     * Returns a new {@link BigtableIO.ReadChangeStream} that will stop 
streaming at the specified
-     * end time.
-     *
-     * <p>Does not modify this object.
-     */
-    public ReadChangeStream withEndTime(Timestamp endTime) {
-      return toBuilder().setEndTime(endTime).build();
-    }
-
     /**
      * Returns a new {@link BigtableIO.ReadChangeStream} that will send 
heartbeat messages at
      * specified interval.
@@ -1800,7 +1786,7 @@ public class BigtableIO {
       InitializeDoFn initializeDoFn =
           new InitializeDoFn(daoFactory, 
metadataTableConfig.getAppProfileId().get(), startTime);
       DetectNewPartitionsDoFn detectNewPartitionsDoFn =
-          new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, 
metrics);
+          new DetectNewPartitionsDoFn(actionFactory, daoFactory, metrics);
       ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
           new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, 
actionFactory, metrics);
 
@@ -1821,8 +1807,6 @@ public class BigtableIO {
 
       abstract ReadChangeStream.Builder setStartTime(Timestamp startTime);
 
-      abstract ReadChangeStream.Builder setEndTime(Timestamp endTime);
-
       abstract ReadChangeStream.Builder setHeartbeatDuration(Duration 
interval);
 
       abstract ReadChangeStream.Builder setChangeStreamName(String 
changeStreamName);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
index 18fbc5fe404..cfc5b5fd3b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
@@ -17,9 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
-import com.google.cloud.Timestamp;
 import java.io.Serializable;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
@@ -66,12 +64,10 @@ public class ActionFactory implements Serializable {
   public synchronized DetectNewPartitionsAction detectNewPartitionsAction(
       ChangeStreamMetrics metrics,
       MetadataTableDao metadataTableDao,
-      @Nullable Timestamp endTime,
       GenerateInitialPartitionsAction generateInitialPartitionsAction) {
     if (detectNewPartitionsAction == null) {
       detectNewPartitionsAction =
-          new DetectNewPartitionsAction(
-              metrics, metadataTableDao, endTime, 
generateInitialPartitionsAction);
+          new DetectNewPartitionsAction(metrics, metadataTableDao, 
generateInitialPartitionsAction);
     }
     return detectNewPartitionsAction;
   }
@@ -85,10 +81,10 @@ public class ActionFactory implements Serializable {
    * @return singleton instance of the {@link GenerateInitialPartitionsAction}
    */
   public synchronized GenerateInitialPartitionsAction 
generateInitialPartitionsAction(
-      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable 
Timestamp endTime) {
+      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
     if (generateInitialPartitionsAction == null) {
       generateInitialPartitionsAction =
-          new GenerateInitialPartitionsAction(metrics, changeStreamDao, 
endTime);
+          new GenerateInitialPartitionsAction(metrics, changeStreamDao);
     }
     return generateInitialPartitionsAction;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 932796d4ba8..4e286fe6397 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -19,9 +19,7 @@ package 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
 import com.google.cloud.Timestamp;
 import com.google.protobuf.InvalidProtocolBufferException;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import org.apache.beam.sdk.io.range.OffsetRange;
@@ -55,17 +53,14 @@ public class DetectNewPartitionsAction {
 
   private final ChangeStreamMetrics metrics;
   private final MetadataTableDao metadataTableDao;
-  @Nullable private final com.google.cloud.Timestamp endTime;
   private final GenerateInitialPartitionsAction 
generateInitialPartitionsAction;
 
   public DetectNewPartitionsAction(
       ChangeStreamMetrics metrics,
       MetadataTableDao metadataTableDao,
-      @Nullable Timestamp endTime,
       GenerateInitialPartitionsAction generateInitialPartitionsAction) {
     this.metrics = metrics;
     this.metadataTableDao = metadataTableDao;
-    this.endTime = endTime;
     this.generateInitialPartitionsAction = generateInitialPartitionsAction;
   }
 
@@ -77,7 +72,6 @@ public class DetectNewPartitionsAction {
    *   <li>Look up the initial list of partitions to stream if it's the very 
first run.
    *   <li>On rest of the runs, try advancing watermark if needed.
    *   <li>Update the metadata table with info about this DoFn.
-   *   <li>Check if this pipeline has reached the end time. Terminate if it 
has.
    *   <li>Process new partitions and output them.
    *   <li>Register callback to clean up processed partitions after bundle has 
been finalized.
    * </ol>
@@ -103,16 +97,6 @@ public class DetectNewPartitionsAction {
       return generateInitialPartitionsAction.run(receiver, tracker, 
watermarkEstimator, startTime);
     }
 
-    // Terminate if endTime <= watermark that means all partitions have read 
up to or beyond
-    // watermark. We no longer need to manage splits and merges, we can 
terminate.
-    if (endTime != null
-        && endTime.compareTo(
-                
TimestampConverter.toCloudTimestamp(watermarkEstimator.currentWatermark()))
-            <= 0) {
-      tracker.tryClaim(tracker.currentRestriction().getTo());
-      return ProcessContinuation.stop();
-    }
-
     if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
       return ProcessContinuation.stop();
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
index 164a679a26f..938464f500a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
-import com.google.cloud.Timestamp;
 import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
@@ -44,13 +42,11 @@ public class GenerateInitialPartitionsAction {
 
   private final ChangeStreamMetrics metrics;
   private final ChangeStreamDao changeStreamDao;
-  @Nullable private final Timestamp endTime;
 
   public GenerateInitialPartitionsAction(
-      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable 
Timestamp endTime) {
+      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
     this.metrics = metrics;
     this.changeStreamDao = changeStreamDao;
-    this.endTime = endTime;
   }
 
   /**
@@ -78,8 +74,7 @@ public class GenerateInitialPartitionsAction {
     for (ByteStringRange partition : streamPartitions) {
       metrics.incListPartitionsCount();
       String uid = UniqueIdGenerator.getNextId();
-      PartitionRecord partitionRecord =
-          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      PartitionRecord partitionRecord = new PartitionRecord(partition, 
startTime, uid, startTime);
       // We are outputting elements with timestamp of 0 to prevent reliance on 
event time. This
       // limits the ability to window on commit time of any data changes. It 
is still possible to
       // window on processing time.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index 321a3c1a1ca..a36b57ff42f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -85,20 +85,12 @@ public class ReadChangeStreamPartitionAction {
    *   <li>Process CloseStream if it exists. In order to solve a possible 
inconsistent state
    *       problem, we do not process CloseStream after receiving it. We claim 
the CloseStream in
    *       the RestrictionTracker so it persists after a checkpoint. We 
checkpoint to flush all the
-   *       DataChanges. Then on resume, we process the CloseStream. There are 
only 2 expected Status
-   *       for CloseStream: OK and Out of Range.
-   *       <ol>
-   *         <li>OK status is returned when the predetermined endTime has been 
reached. In this
-   *             case, we update the watermark and update the metadata table. 
{@link
-   *             DetectNewPartitionsDoFn} aggregates the watermark from all 
the streams to ensure
-   *             all the streams have reached beyond endTime so it can also 
terminate and end the
-   *             beam job.
-   *         <li>Out of Range is returned when the partition has either been 
split into more
-   *             partitions or merged into a larger partition. In this case, 
we write to the
-   *             metadata table the new partitions' information so that {@link
-   *             DetectNewPartitionsDoFn} can read and output those new 
partitions to be streamed.
-   *             We also need to ensure we clean up this partition's metadata 
to release the lock.
-   *       </ol>
+   *       DataChanges. Then on resume, we process the CloseStream. There is 
only 1 expected Status
+   *       for CloseStream: Out of Range. Out of Range is returned when the 
partition has either
+   *       been split into more partitions or merged into a larger partition. 
In this case, we write
+   *       to the metadata table the new partitions' information so that {@link
+   *       DetectNewPartitionsDoFn} can read and output those new partitions 
to be streamed. We also
+   *       need to ensure we clean up this partition's metadata to release the 
lock.
    *   <li>Update the metadata table with the watermark and additional 
debugging info.
    *   <li>Stream the partition.
    * </ol>
@@ -145,11 +137,7 @@ public class ReadChangeStreamPartitionAction {
     try {
       stream =
           changeStreamDao.readChangeStreamPartition(
-              partitionRecord,
-              tracker.currentRestriction(),
-              partitionRecord.getEndTime(),
-              heartbeatDurationSeconds,
-              shouldDebug);
+              partitionRecord, tracker.currentRestriction(), 
heartbeatDurationSeconds, shouldDebug);
       for (ChangeStreamRecord record : stream) {
         Optional<ProcessContinuation> result =
             changeStreamAction.run(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 98902866cb1..3cbd0fb7902 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -31,7 +31,6 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,8 +61,7 @@ public class ChangeStreamDao {
    * Streams a partition.
    *
    * @param partition the partition to stream
-   * @param streamProgress may contain a continuation token for the stream 
request
-   * @param endTime time to end the stream, may be null
+   * @param streamProgress may contain a continuation token for the stream 
request\
    * @param heartbeatDurationSeconds period between heartbeat messages
    * @return stream of ReadChangeStreamResponse
    * @throws IOException if the stream could not be started
@@ -71,7 +69,6 @@ public class ChangeStreamDao {
   public ServerStream<ChangeStreamRecord> readChangeStreamPartition(
       PartitionRecord partition,
       StreamProgress streamProgress,
-      @Nullable Timestamp endTime,
       Duration heartbeatDurationSeconds,
       boolean shouldDebug)
       throws IOException {
@@ -92,9 +89,6 @@ public class ChangeStreamDao {
     } else {
       throw new IOException("Something went wrong");
     }
-    if (endTime != null) {
-      query.endTime(endTime.toProto());
-    }
     query.heartbeatDuration(heartbeatDurationSeconds.getStandardSeconds());
     if (shouldDebug) {
       LOG.info(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
index 6e8f21cd00b..12f95ac160d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
 
 import java.io.IOException;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
@@ -42,7 +41,6 @@ import org.joda.time.Instant;
 @UnboundedPerElement
 public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, 
PartitionRecord> {
   private static final long serialVersionUID = 8052524268978107367L;
-  @Nullable private final com.google.cloud.Timestamp endTime;
 
   private final DaoFactory daoFactory;
   private final ChangeStreamMetrics metrics;
@@ -50,13 +48,9 @@ public class DetectNewPartitionsDoFn extends 
DoFn<com.google.cloud.Timestamp, Pa
   private DetectNewPartitionsAction detectNewPartitionsAction;
 
   public DetectNewPartitionsDoFn(
-      @Nullable com.google.cloud.Timestamp endTime,
-      ActionFactory actionFactory,
-      DaoFactory daoFactory,
-      ChangeStreamMetrics metrics) {
+      ActionFactory actionFactory, DaoFactory daoFactory, ChangeStreamMetrics 
metrics) {
     this.actionFactory = actionFactory;
     this.daoFactory = daoFactory;
-    this.endTime = endTime;
     this.metrics = metrics;
   }
 
@@ -92,10 +86,10 @@ public class DetectNewPartitionsDoFn extends 
DoFn<com.google.cloud.Timestamp, Pa
     final MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao();
     final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
     GenerateInitialPartitionsAction generateInitialPartitionsAction =
-        actionFactory.generateInitialPartitionsAction(metrics, 
changeStreamDao, endTime);
+        actionFactory.generateInitialPartitionsAction(metrics, 
changeStreamDao);
     detectNewPartitionsAction =
         actionFactory.detectNewPartitionsAction(
-            metrics, metadataTableDao, endTime, 
generateInitialPartitionsAction);
+            metrics, metadataTableDao, generateInitialPartitionsAction);
   }
 
   @ProcessElement
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
index 15ebf4c8fd6..c94011c8f7a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
@@ -40,43 +40,26 @@ public class PartitionRecord implements Serializable {
   private ByteStringRange partition;
   @Nullable private Timestamp startTime;
   @Nullable private List<ChangeStreamContinuationToken> 
changeStreamContinuationTokens;
-  @Nullable private Timestamp endTime;
   private String uuid;
   private Timestamp parentLowWatermark;
 
   public PartitionRecord(
-      ByteStringRange partition,
-      Timestamp startTime,
-      String uuid,
-      Timestamp parentLowWatermark,
-      @Nullable Timestamp endTime) {
+      ByteStringRange partition, Timestamp startTime, String uuid, Timestamp 
parentLowWatermark) {
     this.partition = partition;
     this.startTime = startTime;
     this.uuid = uuid;
     this.parentLowWatermark = parentLowWatermark;
-    this.endTime = endTime;
   }
 
   public PartitionRecord(
       ByteStringRange partition,
       List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
       String uuid,
-      Timestamp parentLowWatermark,
-      @Nullable Timestamp endTime) {
+      Timestamp parentLowWatermark) {
     this.partition = partition;
     this.changeStreamContinuationTokens = changeStreamContinuationTokens;
     this.uuid = uuid;
     this.parentLowWatermark = parentLowWatermark;
-    this.endTime = endTime;
-  }
-
-  @Nullable
-  public Timestamp getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(@Nullable Timestamp endTime) {
-    this.endTime = endTime;
   }
 
   @Nullable
@@ -135,7 +118,6 @@ public class PartitionRecord implements Serializable {
         && Objects.equals(getStartTime(), that.getStartTime())
         && Objects.equals(
             getChangeStreamContinuationTokens(), 
that.getChangeStreamContinuationTokens())
-        && Objects.equals(getEndTime(), that.getEndTime())
         && getUuid().equals(that.getUuid())
         && Objects.equals(getParentLowWatermark(), 
that.getParentLowWatermark());
   }
@@ -146,7 +128,6 @@ public class PartitionRecord implements Serializable {
         getPartition(),
         getStartTime(),
         getChangeStreamContinuationTokens(),
-        getEndTime(),
         getUuid(),
         getParentLowWatermark());
   }
@@ -160,8 +141,6 @@ public class PartitionRecord implements Serializable {
         + startTime
         + ", changeStreamContinuationTokens="
         + changeStreamContinuationTokens
-        + ", endTime="
-        + endTime
         + ", uuid='"
         + uuid
         + '\''
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
index b3f86c9be10..c8d88f2af75 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
@@ -68,7 +68,6 @@ public class DetectNewPartitionsActionTest {
   private MetadataTableDao metadataTableDao;
   private ManualWatermarkEstimator<Instant> watermarkEstimator;
   private Timestamp startTime;
-  private Timestamp endTime;
   private static BigtableDataClient dataClient;
   private static BigtableTableAdminClient adminClient;
 
@@ -104,10 +103,8 @@ public class DetectNewPartitionsActionTest {
             metadataTableAdminDao.getChangeStreamNamePrefix());
 
     startTime = Timestamp.now();
-    endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, 
startTime.getNanos());
     action =
-        new DetectNewPartitionsAction(
-            metrics, metadataTableDao, endTime, 
generateInitialPartitionsAction);
+        new DetectNewPartitionsAction(metrics, metadataTableDao, 
generateInitialPartitionsAction);
     watermarkEstimator = new 
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
index c407ec70e63..eddc68a3c3f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
@@ -66,7 +66,6 @@ public class GenerateInitialPartitionsActionTest {
 
   private ManualWatermarkEstimator<Instant> watermarkEstimator;
   private Timestamp startTime;
-  private Timestamp endTime;
   private static BigtableTableAdminClient adminClient;
 
   @Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
@@ -89,7 +88,6 @@ public class GenerateInitialPartitionsActionTest {
             adminClient, null, changeStreamId, 
MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
     metadataTableAdminDao.createMetadataTable();
     startTime = Timestamp.now();
-    endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, 
startTime.getNanos());
     watermarkEstimator = new 
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
   }
 
@@ -103,7 +101,7 @@ public class GenerateInitialPartitionsActionTest {
     
when(changeStreamDao.generateInitialChangeStreamPartitions()).thenReturn(partitionRecordList);
 
     GenerateInitialPartitionsAction generateInitialPartitionsAction =
-        new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+        new GenerateInitialPartitionsAction(metrics, changeStreamDao);
     assertEquals(
         ProcessContinuation.resume(),
         generateInitialPartitionsAction.run(receiver, tracker, 
watermarkEstimator, startTime));

Reply via email to