This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by
this push:
new 331b40117ea Not including support for end time in Bigtable Change
Stream connector (#25474)
331b40117ea is described below
commit 331b40117eac3981ea319265bad18739e1dce297
Author: Tony Tang <[email protected]>
AuthorDate: Wed Feb 15 11:48:49 2023 -0500
Not including support for end time in Bigtable Change Stream connector
(#25474)
---
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 20 ++---------------
.../changestreams/action/ActionFactory.java | 10 +++------
.../action/DetectNewPartitionsAction.java | 16 -------------
.../action/GenerateInitialPartitionsAction.java | 9 ++------
.../action/ReadChangeStreamPartitionAction.java | 26 ++++++----------------
.../changestreams/dao/ChangeStreamDao.java | 8 +------
.../dofn/DetectNewPartitionsDoFn.java | 12 +++-------
.../changestreams/model/PartitionRecord.java | 25 ++-------------------
.../action/DetectNewPartitionsActionTest.java | 5 +----
.../GenerateInitialPartitionsActionTest.java | 4 +---
10 files changed, 22 insertions(+), 113 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d2eff9cbbfa..00048b73d08 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -222,8 +222,7 @@ import org.slf4j.LoggerFactory;
* .withInstanceId(instanceId)
* .withTableId(tableId)
* .withAppProfileId(appProfileId)
- * .withStartTime(startTime)
- * .withEndTime(endTime));
+ * .withStartTime(startTime));
* }</pre>
*
* <h3>Permissions</h3>
@@ -281,7 +280,6 @@ public class BigtableIO {
*
* <ul>
* <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to
now.
- * <li>{@link BigtableIO.ReadChangeStream#withEndTime} which defaults to
empty.
* <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with
defaults to 1 seconds.
* <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId}
which defaults to value
* from {@link BigtableIO.ReadChangeStream#withProjectId}
@@ -1544,8 +1542,6 @@ public class BigtableIO {
abstract @Nullable Timestamp getStartTime();
- abstract @Nullable Timestamp getEndTime();
-
abstract @Nullable Duration getHeartbeatDuration();
abstract @Nullable String getChangeStreamName();
@@ -1657,16 +1653,6 @@ public class BigtableIO {
return toBuilder().setStartTime(startTime).build();
}
- /**
- * Returns a new {@link BigtableIO.ReadChangeStream} that will stop
streaming at the specified
- * end time.
- *
- * <p>Does not modify this object.
- */
- public ReadChangeStream withEndTime(Timestamp endTime) {
- return toBuilder().setEndTime(endTime).build();
- }
-
/**
* Returns a new {@link BigtableIO.ReadChangeStream} that will send
heartbeat messages at
* specified interval.
@@ -1800,7 +1786,7 @@ public class BigtableIO {
InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory,
metadataTableConfig.getAppProfileId().get(), startTime);
DetectNewPartitionsDoFn detectNewPartitionsDoFn =
- new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory,
metrics);
+ new DetectNewPartitionsDoFn(actionFactory, daoFactory, metrics);
ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory,
actionFactory, metrics);
@@ -1821,8 +1807,6 @@ public class BigtableIO {
abstract ReadChangeStream.Builder setStartTime(Timestamp startTime);
- abstract ReadChangeStream.Builder setEndTime(Timestamp endTime);
-
abstract ReadChangeStream.Builder setHeartbeatDuration(Duration
interval);
abstract ReadChangeStream.Builder setChangeStreamName(String
changeStreamName);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
index 18fbc5fe404..cfc5b5fd3b2 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
@@ -17,9 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
-import com.google.cloud.Timestamp;
import java.io.Serializable;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
@@ -66,12 +64,10 @@ public class ActionFactory implements Serializable {
public synchronized DetectNewPartitionsAction detectNewPartitionsAction(
ChangeStreamMetrics metrics,
MetadataTableDao metadataTableDao,
- @Nullable Timestamp endTime,
GenerateInitialPartitionsAction generateInitialPartitionsAction) {
if (detectNewPartitionsAction == null) {
detectNewPartitionsAction =
- new DetectNewPartitionsAction(
- metrics, metadataTableDao, endTime,
generateInitialPartitionsAction);
+ new DetectNewPartitionsAction(metrics, metadataTableDao,
generateInitialPartitionsAction);
}
return detectNewPartitionsAction;
}
@@ -85,10 +81,10 @@ public class ActionFactory implements Serializable {
* @return singleton instance of the {@link GenerateInitialPartitionsAction}
*/
public synchronized GenerateInitialPartitionsAction
generateInitialPartitionsAction(
- ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable
Timestamp endTime) {
+ ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
if (generateInitialPartitionsAction == null) {
generateInitialPartitionsAction =
- new GenerateInitialPartitionsAction(metrics, changeStreamDao,
endTime);
+ new GenerateInitialPartitionsAction(metrics, changeStreamDao);
}
return generateInitialPartitionsAction;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 932796d4ba8..4e286fe6397 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -19,9 +19,7 @@ package
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
import com.google.cloud.Timestamp;
import com.google.protobuf.InvalidProtocolBufferException;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.range.OffsetRange;
@@ -55,17 +53,14 @@ public class DetectNewPartitionsAction {
private final ChangeStreamMetrics metrics;
private final MetadataTableDao metadataTableDao;
- @Nullable private final com.google.cloud.Timestamp endTime;
private final GenerateInitialPartitionsAction
generateInitialPartitionsAction;
public DetectNewPartitionsAction(
ChangeStreamMetrics metrics,
MetadataTableDao metadataTableDao,
- @Nullable Timestamp endTime,
GenerateInitialPartitionsAction generateInitialPartitionsAction) {
this.metrics = metrics;
this.metadataTableDao = metadataTableDao;
- this.endTime = endTime;
this.generateInitialPartitionsAction = generateInitialPartitionsAction;
}
@@ -77,7 +72,6 @@ public class DetectNewPartitionsAction {
* <li>Look up the initial list of partitions to stream if it's the very
first run.
* <li>On rest of the runs, try advancing watermark if needed.
* <li>Update the metadata table with info about this DoFn.
- * <li>Check if this pipeline has reached the end time. Terminate if it
has.
* <li>Process new partitions and output them.
* <li>Register callback to clean up processed partitions after bundle has
been finalized.
* </ol>
@@ -103,16 +97,6 @@ public class DetectNewPartitionsAction {
return generateInitialPartitionsAction.run(receiver, tracker,
watermarkEstimator, startTime);
}
- // Terminate if endTime <= watermark that means all partitions have read
up to or beyond
- // watermark. We no longer need to manage splits and merges, we can
terminate.
- if (endTime != null
- && endTime.compareTo(
-
TimestampConverter.toCloudTimestamp(watermarkEstimator.currentWatermark()))
- <= 0) {
- tracker.tryClaim(tracker.currentRestriction().getTo());
- return ProcessContinuation.stop();
- }
-
if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
return ProcessContinuation.stop();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
index 164a679a26f..938464f500a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
-import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
@@ -44,13 +42,11 @@ public class GenerateInitialPartitionsAction {
private final ChangeStreamMetrics metrics;
private final ChangeStreamDao changeStreamDao;
- @Nullable private final Timestamp endTime;
public GenerateInitialPartitionsAction(
- ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable
Timestamp endTime) {
+ ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
this.metrics = metrics;
this.changeStreamDao = changeStreamDao;
- this.endTime = endTime;
}
/**
@@ -78,8 +74,7 @@ public class GenerateInitialPartitionsAction {
for (ByteStringRange partition : streamPartitions) {
metrics.incListPartitionsCount();
String uid = UniqueIdGenerator.getNextId();
- PartitionRecord partitionRecord =
- new PartitionRecord(partition, startTime, uid, startTime, endTime);
+ PartitionRecord partitionRecord = new PartitionRecord(partition,
startTime, uid, startTime);
// We are outputting elements with timestamp of 0 to prevent reliance on
event time. This
// limits the ability to window on commit time of any data changes. It
is still possible to
// window on processing time.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index 321a3c1a1ca..a36b57ff42f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -85,20 +85,12 @@ public class ReadChangeStreamPartitionAction {
* <li>Process CloseStream if it exists. In order to solve a possible
inconsistent state
* problem, we do not process CloseStream after receiving it. We claim
the CloseStream in
* the RestrictionTracker so it persists after a checkpoint. We
checkpoint to flush all the
- * DataChanges. Then on resume, we process the CloseStream. There are
only 2 expected Status
- * for CloseStream: OK and Out of Range.
- * <ol>
- * <li>OK status is returned when the predetermined endTime has been
reached. In this
- * case, we update the watermark and update the metadata table.
{@link
- * DetectNewPartitionsDoFn} aggregates the watermark from all
the streams to ensure
- * all the streams have reached beyond endTime so it can also
terminate and end the
- * beam job.
- * <li>Out of Range is returned when the partition has either been
split into more
- * partitions or merged into a larger partition. In this case,
we write to the
- * metadata table the new partitions' information so that {@link
- * DetectNewPartitionsDoFn} can read and output those new
partitions to be streamed.
- * We also need to ensure we clean up this partition's metadata
to release the lock.
- * </ol>
+ * DataChanges. Then on resume, we process the CloseStream. There is
only 1 expected Status
+ * for CloseStream: Out of Range. Out of Range is returned when the
partition has either
+ * been split into more partitions or merged into a larger partition.
In this case, we write
+ * to the metadata table the new partitions' information so that {@link
+ * DetectNewPartitionsDoFn} can read and output those new partitions
to be streamed. We also
+ * need to ensure we clean up this partition's metadata to release the
lock.
* <li>Update the metadata table with the watermark and additional
debugging info.
* <li>Stream the partition.
* </ol>
@@ -145,11 +137,7 @@ public class ReadChangeStreamPartitionAction {
try {
stream =
changeStreamDao.readChangeStreamPartition(
- partitionRecord,
- tracker.currentRestriction(),
- partitionRecord.getEndTime(),
- heartbeatDurationSeconds,
- shouldDebug);
+ partitionRecord, tracker.currentRestriction(),
heartbeatDurationSeconds, shouldDebug);
for (ChangeStreamRecord record : stream) {
Optional<ProcessContinuation> result =
changeStreamAction.run(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 98902866cb1..3cbd0fb7902 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -31,7 +31,6 @@ import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +61,7 @@ public class ChangeStreamDao {
* Streams a partition.
*
* @param partition the partition to stream
- * @param streamProgress may contain a continuation token for the stream
request
- * @param endTime time to end the stream, may be null
+ * @param streamProgress may contain a continuation token for the stream
request\
* @param heartbeatDurationSeconds period between heartbeat messages
* @return stream of ReadChangeStreamResponse
* @throws IOException if the stream could not be started
@@ -71,7 +69,6 @@ public class ChangeStreamDao {
public ServerStream<ChangeStreamRecord> readChangeStreamPartition(
PartitionRecord partition,
StreamProgress streamProgress,
- @Nullable Timestamp endTime,
Duration heartbeatDurationSeconds,
boolean shouldDebug)
throws IOException {
@@ -92,9 +89,6 @@ public class ChangeStreamDao {
} else {
throw new IOException("Something went wrong");
}
- if (endTime != null) {
- query.endTime(endTime.toProto());
- }
query.heartbeatDuration(heartbeatDurationSeconds.getStandardSeconds());
if (shouldDebug) {
LOG.info(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
index 6e8f21cd00b..12f95ac160d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
import java.io.IOException;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
@@ -42,7 +41,6 @@ import org.joda.time.Instant;
@UnboundedPerElement
public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp,
PartitionRecord> {
private static final long serialVersionUID = 8052524268978107367L;
- @Nullable private final com.google.cloud.Timestamp endTime;
private final DaoFactory daoFactory;
private final ChangeStreamMetrics metrics;
@@ -50,13 +48,9 @@ public class DetectNewPartitionsDoFn extends
DoFn<com.google.cloud.Timestamp, Pa
private DetectNewPartitionsAction detectNewPartitionsAction;
public DetectNewPartitionsDoFn(
- @Nullable com.google.cloud.Timestamp endTime,
- ActionFactory actionFactory,
- DaoFactory daoFactory,
- ChangeStreamMetrics metrics) {
+ ActionFactory actionFactory, DaoFactory daoFactory, ChangeStreamMetrics
metrics) {
this.actionFactory = actionFactory;
this.daoFactory = daoFactory;
- this.endTime = endTime;
this.metrics = metrics;
}
@@ -92,10 +86,10 @@ public class DetectNewPartitionsDoFn extends
DoFn<com.google.cloud.Timestamp, Pa
final MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao();
final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
GenerateInitialPartitionsAction generateInitialPartitionsAction =
- actionFactory.generateInitialPartitionsAction(metrics,
changeStreamDao, endTime);
+ actionFactory.generateInitialPartitionsAction(metrics,
changeStreamDao);
detectNewPartitionsAction =
actionFactory.detectNewPartitionsAction(
- metrics, metadataTableDao, endTime,
generateInitialPartitionsAction);
+ metrics, metadataTableDao, generateInitialPartitionsAction);
}
@ProcessElement
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
index 15ebf4c8fd6..c94011c8f7a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
@@ -40,43 +40,26 @@ public class PartitionRecord implements Serializable {
private ByteStringRange partition;
@Nullable private Timestamp startTime;
@Nullable private List<ChangeStreamContinuationToken>
changeStreamContinuationTokens;
- @Nullable private Timestamp endTime;
private String uuid;
private Timestamp parentLowWatermark;
public PartitionRecord(
- ByteStringRange partition,
- Timestamp startTime,
- String uuid,
- Timestamp parentLowWatermark,
- @Nullable Timestamp endTime) {
+ ByteStringRange partition, Timestamp startTime, String uuid, Timestamp
parentLowWatermark) {
this.partition = partition;
this.startTime = startTime;
this.uuid = uuid;
this.parentLowWatermark = parentLowWatermark;
- this.endTime = endTime;
}
public PartitionRecord(
ByteStringRange partition,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
String uuid,
- Timestamp parentLowWatermark,
- @Nullable Timestamp endTime) {
+ Timestamp parentLowWatermark) {
this.partition = partition;
this.changeStreamContinuationTokens = changeStreamContinuationTokens;
this.uuid = uuid;
this.parentLowWatermark = parentLowWatermark;
- this.endTime = endTime;
- }
-
- @Nullable
- public Timestamp getEndTime() {
- return endTime;
- }
-
- public void setEndTime(@Nullable Timestamp endTime) {
- this.endTime = endTime;
}
@Nullable
@@ -135,7 +118,6 @@ public class PartitionRecord implements Serializable {
&& Objects.equals(getStartTime(), that.getStartTime())
&& Objects.equals(
getChangeStreamContinuationTokens(),
that.getChangeStreamContinuationTokens())
- && Objects.equals(getEndTime(), that.getEndTime())
&& getUuid().equals(that.getUuid())
&& Objects.equals(getParentLowWatermark(),
that.getParentLowWatermark());
}
@@ -146,7 +128,6 @@ public class PartitionRecord implements Serializable {
getPartition(),
getStartTime(),
getChangeStreamContinuationTokens(),
- getEndTime(),
getUuid(),
getParentLowWatermark());
}
@@ -160,8 +141,6 @@ public class PartitionRecord implements Serializable {
+ startTime
+ ", changeStreamContinuationTokens="
+ changeStreamContinuationTokens
- + ", endTime="
- + endTime
+ ", uuid='"
+ uuid
+ '\''
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
index b3f86c9be10..c8d88f2af75 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
@@ -68,7 +68,6 @@ public class DetectNewPartitionsActionTest {
private MetadataTableDao metadataTableDao;
private ManualWatermarkEstimator<Instant> watermarkEstimator;
private Timestamp startTime;
- private Timestamp endTime;
private static BigtableDataClient dataClient;
private static BigtableTableAdminClient adminClient;
@@ -104,10 +103,8 @@ public class DetectNewPartitionsActionTest {
metadataTableAdminDao.getChangeStreamNamePrefix());
startTime = Timestamp.now();
- endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10,
startTime.getNanos());
action =
- new DetectNewPartitionsAction(
- metrics, metadataTableDao, endTime,
generateInitialPartitionsAction);
+ new DetectNewPartitionsAction(metrics, metadataTableDao,
generateInitialPartitionsAction);
watermarkEstimator = new
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
index c407ec70e63..eddc68a3c3f 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
@@ -66,7 +66,6 @@ public class GenerateInitialPartitionsActionTest {
private ManualWatermarkEstimator<Instant> watermarkEstimator;
private Timestamp startTime;
- private Timestamp endTime;
private static BigtableTableAdminClient adminClient;
@Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
@@ -89,7 +88,6 @@ public class GenerateInitialPartitionsActionTest {
adminClient, null, changeStreamId,
MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
metadataTableAdminDao.createMetadataTable();
startTime = Timestamp.now();
- endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10,
startTime.getNanos());
watermarkEstimator = new
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
}
@@ -103,7 +101,7 @@ public class GenerateInitialPartitionsActionTest {
when(changeStreamDao.generateInitialChangeStreamPartitions()).thenReturn(partitionRecordList);
GenerateInitialPartitionsAction generateInitialPartitionsAction =
- new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+ new GenerateInitialPartitionsAction(metrics, changeStreamDao);
assertEquals(
ProcessContinuation.resume(),
generateInitialPartitionsAction.run(receiver, tracker,
watermarkEstimator, startTime));