kennknowles commented on code in PR #28751:
URL: https://github.com/apache/beam/pull/28751#discussion_r1341823099
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java:
##########
@@ -114,50 +116,59 @@ public void tearDown() throws NoSuchFieldException,
IllegalAccessException {
}
@Test
- @Ignore("BEAM-12164 Reenable this test when databaseClient.getDialect
returns the right message.")
- public void testResourceExhaustedDoesNotRetry() {
+ // Error code UNAVAILABLE is retried repeatedly until the RPC times out.
+ public void testUnavailableExceptionRetries() throws InterruptedException {
+ DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+ options.setBlockOnRun(false);
+ options.setRunner(DirectRunner.class);
+ Pipeline nonBlockingPipeline = TestPipeline.create(options);
+
mockSpannerService.setExecuteStreamingSqlExecutionTime(
-
SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
+
SimulatedExecutionTime.ofStickyException(Status.UNAVAILABLE.asRuntimeException()));
final Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos(0, 1000);
final Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(startTimestamp.getSeconds(),
startTimestamp.getNanos() + 1);
+
try {
- pipeline.apply(
+ nonBlockingPipeline.apply(
SpannerIO.readChangeStream()
.withSpannerConfig(getSpannerConfig())
.withChangeStreamName(TEST_CHANGE_STREAM)
.withMetadataDatabase(TEST_DATABASE)
.withMetadataTable(TEST_TABLE)
.withInclusiveStartAt(startTimestamp)
.withInclusiveEndAt(endTimestamp));
- pipeline.run().waitUntilFinish();
+ PipelineResult result = nonBlockingPipeline.run();
+ while (result.getState() != RUNNING) {
+ Thread.sleep(50);
+ }
+ // The pipeline continues making requests to Spanner to retry the
Unavailable errors.
+ assertNull(result.waitUntilFinish(Duration.millis(500)));
} finally {
- thrown.expect(SpannerException.class);
// databaseClient.getDialect does not currently bubble up the correct
message.
// Instead, the error returned is: "DEADLINE_EXCEEDED: Operation did not
complete "
// "in the given time"
- thrown.expectMessage("RESOURCE_EXHAUSTED - Statement: 'SELECT
'POSTGRESQL' AS DIALECT");
+ thrown.expectMessage("DEADLINE_EXCEEDED: Operation did not complete in
the given time");
Review Comment:
It would be fine to make this a flexible match, perhaps just
`DEADLINE_EXCEEDED`. The correctness of SpannerChangeStream does not depend on
the details. And same comment for all error message matching.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java:
##########
@@ -311,11 +351,9 @@ public void testInvalidRecordReceived() {
pipeline.run().waitUntilFinish();
} finally {
thrown.expect(PipelineExecutionException.class);
- // DatabaseClient.getDialect returns "DEADLINE_EXCEEDED: Operation did
not complete in the "
- // given time" even though we mocked it out.
thrown.expectMessage("Field not found");
assertThat(
- mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class),
Matchers.equalTo(0));
+ mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class),
Matchers.greaterThan(0));
Review Comment:
This is a behavior change from expecting 0 to expecting great than 0. Just
noting this. I don't have a strong opinion about it.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java:
##########
@@ -300,9 +330,19 @@ public void testInvalidRecordReceived() {
mockInvalidChangeStreamRecordReceived(startTimestamp, endTimestamp);
try {
+ final SpannerConfig changeStreamConfig =
+ SpannerConfig.create()
Review Comment:
Positive comment - inlining the config also makes the test better, as the
setup should be inline with the result. Great.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]