[ 
https://issues.apache.org/jira/browse/FLINK-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561643#comment-16561643
 ] 

ASF GitHub Bot commented on FLINK-9900:
---------------------------------------

zentol closed pull request #6395: [FLINK-9900][tests] Harden 
ZooKeeperHighAvailabilityITCase
URL: https://github.com/apache/flink/pull/6395
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index e02ed010242..b83f89e4ca5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -31,7 +31,12 @@
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -56,6 +61,12 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +74,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -107,6 +119,10 @@ public static void setup() throws Exception {
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
                config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 
+               config.setString(
+                       ConfigConstants.METRICS_REPORTER_PREFIX + "restarts." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+                       RestartReporter.class.getName());
+
                // we have to manage this manually because we have to create 
the ZooKeeper server
                // ahead of this
                miniClusterResource = new MiniClusterResource(
@@ -184,58 +200,59 @@ public void testRestoreBehaviourWithFaultyStateHandles() 
throws Exception {
                // wait until we did some checkpoints
                waitForCheckpointLatch.await();
 
+               log.debug("Messing with HA directory");
                // mess with the HA directory so that the job cannot restore
                File movedCheckpointLocation = TEMPORARY_FOLDER.newFolder();
-               int numCheckpoints = 0;
-               File[] files = haStorageDir.listFiles();
-               assertNotNull(files);
-               for (File file : files) {
-                       if (file.getName().startsWith("completedCheckpoint")) {
-                               assertTrue(file.renameTo(new 
File(movedCheckpointLocation, file.getName())));
-                               numCheckpoints++;
+               AtomicInteger numCheckpoints = new AtomicInteger();
+               Files.walkFileTree(haStorageDir.toPath(), new 
SimpleFileVisitor<Path>() {
+                       @Override
+                       public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                               if 
(file.getFileName().toString().startsWith("completedCheckpoint")) {
+                                       log.debug("Moving original checkpoint 
file {}.", file);
+                                       try {
+                                               Files.move(file, 
movedCheckpointLocation.toPath().resolve(file.getFileName()));
+                                               
numCheckpoints.incrementAndGet();
+                                       } catch (IOException ioe) {
+                                               // previous checkpoint files 
may be deleted asynchronously
+                                               log.debug("Exception while 
moving HA files.", ioe);
+                                       }
+                               }
+                               return FileVisitResult.CONTINUE;
                        }
-               }
+               });
+
                // Note to future developers: This will break when we change 
Flink to not put the
                // checkpoint metadata into the HA directory but instead rely 
on the fact that the
                // actual checkpoint directory on DFS contains the checkpoint 
metadata. In this case,
                // ZooKeeper will only contain a "handle" (read: String) that 
points to the metadata
                // in DFS. The likely solution will be that we have to go 
directly to ZooKeeper, find
                // out where the checkpoint is stored and mess with that.
-               assertTrue(numCheckpoints > 0);
+               assertTrue(numCheckpoints.get() > 0);
 
+               log.debug("Resuming job");
                failInCheckpointLatch.trigger();
 
-               // Ensure that we see at least one cycle where the job tries to 
restart and fails.
-               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
-                       () -> clusterClient.getJobStatus(jobID),
-                       Time.milliseconds(1),
-                       deadline,
-                       (jobStatus) -> jobStatus == JobStatus.RESTARTING,
-                       TestingUtils.defaultScheduledExecutor());
-               assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
-
-               jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
-                       () -> clusterClient.getJobStatus(jobID),
-                       Time.milliseconds(1),
-                       deadline,
-                       (jobStatus) -> jobStatus == JobStatus.FAILING,
-                       TestingUtils.defaultScheduledExecutor());
-               assertEquals(JobStatus.FAILING, jobStatusFuture.get());
+               assertNotNull("fullRestarts metric could not be accessed.", 
RestartReporter.numRestarts);
+               while (RestartReporter.numRestarts.getValue() < 5 && 
deadline.hasTimeLeft()) {
+                       Thread.sleep(50);
+               }
+               assertThat(RestartReporter.numRestarts.getValue(), 
is(greaterThan(4L)));
 
                // move back the HA directory so that the job can restore
                CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
+               log.debug("Restored zookeeper");
 
-               files = movedCheckpointLocation.listFiles();
-               assertNotNull(files);
-               for (File file : files) {
-                       if (file.getName().startsWith("completedCheckpoint")) {
-                               assertTrue(file.renameTo(new File(haStorageDir, 
file.getName())));
+               Files.walkFileTree(movedCheckpointLocation.toPath(), new 
SimpleFileVisitor<Path>() {
+                       @Override
+                       public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
+                               Files.move(file, 
haStorageDir.toPath().resolve(file.getFileName()));
+                               return FileVisitResult.CONTINUE;
                        }
-               }
+               });
 
                // now the job should be able to go to RUNNING again and then 
eventually to FINISHED,
                // which it only does if it could successfully restore
-               jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
                        () -> clusterClient.getJobStatus(jobID),
                        Time.milliseconds(50),
                        deadline,
@@ -324,4 +341,30 @@ public void initializeState(FunctionInitializationContext 
context) {
                        }
                }
        }
+
+       /**
+        * Reporter that exposes the {@link NumberOfFullRestartsGauge} metric.
+        */
+       public static class RestartReporter implements MetricReporter {
+               static volatile NumberOfFullRestartsGauge numRestarts = null;
+
+               @Override
+               public void open(MetricConfig metricConfig) {
+               }
+
+               @Override
+               public void close() {
+               }
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String s, 
MetricGroup metricGroup) {
+                       if (metric instanceof NumberOfFullRestartsGauge) {
+                               numRestarts = (NumberOfFullRestartsGauge) 
metric;
+                       }
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String s, 
MetricGroup metricGroup) {
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Failed to testRestoreBehaviourWithFaultyStateHandles 
> (org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase) 
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9900
>                 URL: https://issues.apache.org/jira/browse/FLINK-9900
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.5.1, 1.6.0
>            Reporter: zhangminglei
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> https://api.travis-ci.org/v3/job/405843617/log.txt
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 124.598 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase
>  
> testRestoreBehaviourWithFaultyStateHandles(org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase)
>  Time elapsed: 120.036 sec <<< ERROR!
>  org.junit.runners.model.TestTimedOutException: test timed out after 120000 
> milliseconds
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>  at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.test.checkpointing.ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles(ZooKeeperHighAvailabilityITCase.java:244)
> Results :
> Tests in error: 
>  
> ZooKeeperHighAvailabilityITCase.testRestoreBehaviourWithFaultyStateHandles:244
>  » TestTimedOut
> Tests run: 1453, Failures: 0, Errors: 1, Skipped: 29



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to