nancyxu123 commented on code in PR #28751:
URL: https://github.com/apache/beam/pull/28751#discussion_r1342958076


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java:
##########
@@ -114,50 +116,59 @@ public void tearDown() throws NoSuchFieldException, 
IllegalAccessException {
   }
 
   @Test
-  @Ignore("BEAM-12164 Reenable this test when databaseClient.getDialect 
returns the right message.")
-  public void testResourceExhaustedDoesNotRetry() {
+  // Error code UNAVAILABLE is retried repeatedly until the RPC times out.
+  public void testUnavailableExceptionRetries() throws InterruptedException {
+    DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    options.setRunner(DirectRunner.class);
+    Pipeline nonBlockingPipeline = TestPipeline.create(options);
+
     mockSpannerService.setExecuteStreamingSqlExecutionTime(
-        
SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
+        
SimulatedExecutionTime.ofStickyException(Status.UNAVAILABLE.asRuntimeException()));
 
     final Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos(0, 1000);
     final Timestamp endTimestamp =
         Timestamp.ofTimeSecondsAndNanos(startTimestamp.getSeconds(), 
startTimestamp.getNanos() + 1);
+
     try {
-      pipeline.apply(
+      nonBlockingPipeline.apply(
           SpannerIO.readChangeStream()
               .withSpannerConfig(getSpannerConfig())
               .withChangeStreamName(TEST_CHANGE_STREAM)
               .withMetadataDatabase(TEST_DATABASE)
               .withMetadataTable(TEST_TABLE)
               .withInclusiveStartAt(startTimestamp)
               .withInclusiveEndAt(endTimestamp));
-      pipeline.run().waitUntilFinish();
+      PipelineResult result = nonBlockingPipeline.run();
+      while (result.getState() != RUNNING) {
+        Thread.sleep(50);
+      }
+      // The pipeline continues making requests to Spanner to retry the 
Unavailable errors.
+      assertNull(result.waitUntilFinish(Duration.millis(500)));
     } finally {
-      thrown.expect(SpannerException.class);
       // databaseClient.getDialect does not currently bubble up the correct 
message.
       // Instead, the error returned is: "DEADLINE_EXCEEDED: Operation did not 
complete "
       // "in the given time"
-      thrown.expectMessage("RESOURCE_EXHAUSTED - Statement: 'SELECT 
'POSTGRESQL' AS DIALECT");
+      thrown.expectMessage("DEADLINE_EXCEEDED: Operation did not complete in 
the given time");

Review Comment:
   Sure thing, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to