This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c4da7c  [FLINK-22014][ha] Make sure that AbstractHaServices delete 
first the HA data before deleting blobs on closeAndCleanupAllData
9c4da7c is described below

commit 9c4da7c3f30dd755562c63cab1283cf5bfaf782f
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Apr 1 18:57:39 2021 +0200

    [FLINK-22014][ha] Make sure that AbstractHaServices delete first the HA 
data before deleting blobs on closeAndCleanupAllData
    
    The AbstractHaServices should always first delete the HA data (pointers to 
jobs and checkpoints) before deleting the referenced
    objects such as blobs. That way we ensure that we don't leave the system in 
an invalid state if the process stops abruptly because
    either all the data is still available or the pointers have been cleared 
and Flink only leaves some orphaned blobs. The case
    where the pointers are still there but not the blobs will no longer be 
possible with this fix.
    
    This closes #15468.
---
 .../highavailability/AbstractHaServices.java       |  17 +-
 .../highavailability/AbstractHaServicesTest.java   | 226 +++++++++++++++++++++
 2 files changed, 239 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index 2d895ed..9a322a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -170,20 +170,29 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
 
         Throwable exception = null;
 
+        boolean deletedHAData = false;
+
         try {
-            blobStoreService.closeAndCleanupAllData();
-        } catch (Throwable t) {
+            internalCleanup();
+            deletedHAData = true;
+        } catch (Exception t) {
             exception = t;
         }
 
         try {
-            internalCleanup();
+            internalClose();
         } catch (Throwable t) {
             exception = ExceptionUtils.firstOrSuppressed(t, exception);
         }
 
         try {
-            internalClose();
+            if (deletedHAData) {
+                blobStoreService.closeAndCleanupAllData();
+            } else {
+                logger.info(
+                        "Cannot delete HA blobs because we failed to delete 
the pointers in the HA store.");
+                blobStoreService.close();
+            }
         } catch (Throwable t) {
             exception = ExceptionUtils.firstOrSuppressed(t, exception);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
new file mode 100644
index 0000000..a90dc61
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link AbstractHaServices}. */
+public class AbstractHaServicesTest extends TestLogger {
+
+    /**
+     * Tests that we first delete all pointers from the HA services before 
deleting the blobs. See
+     * FLINK-22014 for more details.
+     */
+    @Test
+    public void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() 
throws Exception {
+        final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
+
+        final TestingBlobStoreService testingBlobStoreService =
+                new TestingBlobStoreService(closeOperations);
+
+        final TestingHaServices haServices =
+                new TestingHaServices(
+                        new Configuration(),
+                        Executors.directExecutor(),
+                        testingBlobStoreService,
+                        closeOperations,
+                        () -> 
closeOperations.offer(CloseOperations.HA_CLEANUP));
+
+        haServices.closeAndCleanupAllData();
+
+        assertThat(
+                closeOperations,
+                contains(
+                        CloseOperations.HA_CLEANUP,
+                        CloseOperations.HA_CLOSE,
+                        CloseOperations.BLOB_CLEANUP_AND_CLOSE));
+    }
+
+    /**
+     * Tests that we don't delete the HA blobs if we could not clean up the 
pointers from the HA
+     * services. See FLINK-22014 for more details.
+     */
+    @Test
+    public void 
testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails() {
+        final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
+
+        final TestingBlobStoreService testingBlobStoreService =
+                new TestingBlobStoreService(closeOperations);
+
+        final TestingHaServices haServices =
+                new TestingHaServices(
+                        new Configuration(),
+                        Executors.directExecutor(),
+                        testingBlobStoreService,
+                        closeOperations,
+                        () -> {
+                            throw new FlinkException("test exception");
+                        });
+
+        try {
+            haServices.closeAndCleanupAllData();
+            fail("Expected that the close operation fails.");
+        } catch (Exception expected) {
+
+        }
+
+        assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, 
CloseOperations.BLOB_CLOSE));
+    }
+
+    private enum CloseOperations {
+        HA_CLEANUP,
+        HA_CLOSE,
+        BLOB_CLEANUP_AND_CLOSE,
+        BLOB_CLOSE,
+    }
+
+    private static final class TestingBlobStoreService implements 
BlobStoreService {
+
+        private final Queue<CloseOperations> closeOperations;
+
+        private TestingBlobStoreService(Queue<CloseOperations> 
closeOperations) {
+            this.closeOperations = closeOperations;
+        }
+
+        @Override
+        public void closeAndCleanupAllData() {
+            closeOperations.offer(CloseOperations.BLOB_CLEANUP_AND_CLOSE);
+        }
+
+        @Override
+        public void close() throws IOException {
+            closeOperations.offer(CloseOperations.BLOB_CLOSE);
+        }
+
+        @Override
+        public boolean put(File localFile, JobID jobId, BlobKey blobKey) 
throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean delete(JobID jobId, BlobKey blobKey) {
+            return false;
+        }
+
+        @Override
+        public boolean deleteAll(JobID jobId) {
+            return false;
+        }
+
+        @Override
+        public boolean get(JobID jobId, BlobKey blobKey, File localFile) 
throws IOException {
+            return false;
+        }
+    }
+
+    private static final class TestingHaServices extends AbstractHaServices {
+
+        private final Queue<? super CloseOperations> closeOperations;
+        private final RunnableWithException internalCleanupRunnable;
+
+        private TestingHaServices(
+                Configuration config,
+                Executor ioExecutor,
+                BlobStoreService blobStoreService,
+                Queue<? super CloseOperations> closeOperations,
+                RunnableWithException internalCleanupRunnable) {
+            super(config, ioExecutor, blobStoreService);
+            this.closeOperations = closeOperations;
+            this.internalCleanupRunnable = internalCleanupRunnable;
+        }
+
+        @Override
+        protected LeaderElectionService createLeaderElectionService(String 
leaderName) {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected LeaderRetrievalService createLeaderRetrievalService(String 
leaderName) {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected JobGraphStore createJobGraphStore() throws Exception {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected RunningJobsRegistry createRunningJobsRegistry() {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected void internalClose() {
+            closeOperations.offer(CloseOperations.HA_CLOSE);
+        }
+
+        @Override
+        protected void internalCleanup() throws Exception {
+            internalCleanupRunnable.run();
+        }
+
+        @Override
+        protected String getLeaderNameForResourceManager() {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected String getLeaderNameForDispatcher() {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected String getLeaderNameForJobManager(JobID jobID) {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+
+        @Override
+        protected String getLeaderNameForRestServer() {
+            throw new UnsupportedOperationException("Not supported by this 
test implementation.");
+        }
+    }
+}

Reply via email to