This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new d249094  [FLINK-16346][tests] Remove timeout
d249094 is described below

commit d2490948e4b69cb42d48c5518c85402cb2ccae88
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Sep 9 08:37:02 2021 +0200

    [FLINK-16346][tests] Remove timeout
---
 .../runtime/jobmanager/BlobsCleanupITCase.java     | 34 +++++++---------------
 1 file changed, 11 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
index 5d1a48e..38b94cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
@@ -55,19 +54,17 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.net.InetSocketAddress;
 import java.nio.file.NoSuchFileException;
-import java.time.Duration;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
@@ -158,7 +155,6 @@ public class BlobsCleanupITCase extends TestLogger {
     private void testBlobServerCleanup(final TestCase testCase) throws 
Exception {
         final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
         final int numTasks = 2;
-        final Deadline timeout = Deadline.fromNow(Duration.ofSeconds(30L));
 
         final JobGraph jobGraph = createJobGraph(testCase, numTasks);
         final JobID jid = jobGraph.getJobID();
@@ -238,7 +234,7 @@ public class BlobsCleanupITCase extends TestLogger {
         File[] blobDirs = blobBaseDir.listFiles((dir, name) -> 
name.startsWith("blobStore-"));
         assertNotNull(blobDirs);
         for (File blobDir : blobDirs) {
-            waitForEmptyBlobDir(blobDir, timeout.timeLeft());
+            waitForEmptyBlobDir(blobDir);
         }
     }
 
@@ -263,27 +259,19 @@ public class BlobsCleanupITCase extends TestLogger {
      *
      * @param blobDir directory of a {@link 
org.apache.flink.runtime.blob.BlobServer} or {@link
      *     org.apache.flink.runtime.blob.BlobCacheService}
-     * @param remaining remaining time for this test
      * @see org.apache.flink.runtime.blob.BlobUtils
      */
-    private static void waitForEmptyBlobDir(File blobDir, Duration remaining)
-            throws InterruptedException {
-        long deadline = System.currentTimeMillis() + remaining.toMillis();
-        String[] blobDirContents;
+    private static void waitForEmptyBlobDir(File blobDir) throws 
InterruptedException {
         final FilenameFilter jobDirFilter = (dir, name) -> 
name.startsWith("job_");
 
-        do {
-            blobDirContents = blobDir.list(jobDirFilter);
-            if (blobDirContents == null || blobDirContents.length == 0) {
-                return;
-            }
-            Thread.sleep(RETRY_INTERVAL);
-        } while (System.currentTimeMillis() < deadline);
+        final Supplier<Boolean> isDirEmpty =
+                () -> {
+                    String[] blobDirContents = blobDir.list(jobDirFilter);
+                    return blobDirContents == null || blobDirContents.length 
== 0;
+                };
 
-        fail(
-                "Timeout while waiting for "
-                        + blobDir.getAbsolutePath()
-                        + " to become empty. Current contents: "
-                        + Arrays.toString(blobDirContents));
+        while (!isDirEmpty.get()) {
+            Thread.sleep(RETRY_INTERVAL);
+        }
     }
 }

Reply via email to