scwhittle commented on code in PR #37718:
URL: https://github.com/apache/beam/pull/37718#discussion_r2917180973
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java:
##########
@@ -96,6 +99,11 @@ public Optional<ProcessContinuation> run(
watermarkEstimator.setWatermark(timestampInstant);
LOG.debug("[{}] Heartbeat record action completed successfully", token);
- return Optional.empty();
+ if (timestamp.equals(endTimestamp)) {
+ // this is probably last element in query, let it finish query
+ return Optional.empty();
+ }
+ // no new data, finish reading data
+ return Optional.of(ProcessContinuation.resume());
Review Comment:
not using cancelQueryOnHeartbeat
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1912,6 +1936,17 @@ public ReadChangeStream
withUsingPlainTextChannel(boolean plainText) {
return
withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}
+ /**
+ * Configures the change stream to checkpoint and flush output targeting
low latency at the cost
+ * of higher rpc rate and cpu usage.
+ */
+ public ReadChangeStream withLowLatency() {
+ return toBuilder()
+ .setHeartbeatMillis(DEFAULT_LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS)
Review Comment:
I think we should set both the realtime end timestamp and the heartbeat.
Heartbeat may not trigger if data is trickling in every 50 ms and we could take
up to the runner split time (default 5s) Since e2e processing requires the
bundle to finish and commit, having the realtime end timeout of 1s would help
e2e latency in that case.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java:
##########
@@ -111,9 +118,54 @@ public void testSoftDeadlineReached() {
new HeartbeatRecord(timestamp, null),
tracker,
interrupter,
- watermarkEstimator);
+ watermarkEstimator,
+ endTimestamp);
assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
}
+
+ @Test
+ public void testEndTimestampReachedOnCancellingAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(tracker.tryClaim(timestamp)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ cancellingAction.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new
Instant(timestamp.toSqlTimestamp().getTime()));
+ }
+
+ @Test
+ public void testEndTimestampNotReachedOnCancellingAction() {
Review Comment:
test both with and without the configuration that cancellation should happen
on heartbeat
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java:
##########
@@ -103,7 +104,9 @@ public void setUp() {
partitionEventRecordAction = mock(PartitionEventRecordAction.class);
queryChangeStreamAction = mock(QueryChangeStreamAction.class);
- doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory,
actionFactory, metrics);
+ doFn =
+ new ReadChangeStreamPartitionDoFn(
+ daoFactory, mapperFactory, actionFactory, metrics,
Duration.standardMinutes(2), false);
Review Comment:
have a constant for this false that you can use below for the actions as
well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]