This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9c4da7c [FLINK-22014][ha] Make sure that AbstractHaServices delete first the HA data before deleting blobs on closeAndCleanupAllData 9c4da7c is described below commit 9c4da7c3f30dd755562c63cab1283cf5bfaf782f Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Apr 1 18:57:39 2021 +0200 [FLINK-22014][ha] Make sure that AbstractHaServices delete first the HA data before deleting blobs on closeAndCleanupAllData The AbstractHaServices should always first delete the HA data (pointers to jobs and checkpoints) before deleting the referenced objects such as blobs. That way we ensure that we don't leave the system in an invalid state if the process stops abruptly because either all the data is still available or the pointers have been cleared and Flink only leaves some orphaned blobs. The case where the pointers are still there but not the blobs will no longer be possible with this fix. This closes #15468. --- .../highavailability/AbstractHaServices.java | 17 +- .../highavailability/AbstractHaServicesTest.java | 226 +++++++++++++++++++++ 2 files changed, 239 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 2d895ed..9a322a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -170,20 +170,29 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { Throwable exception = null; + boolean deletedHAData = false; + try { - blobStoreService.closeAndCleanupAllData(); - } catch (Throwable t) { + internalCleanup(); + deletedHAData = true; + } catch (Exception t) { exception = t; } try { - internalCleanup(); + internalClose(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } try { - internalClose(); + if (deletedHAData) { + blobStoreService.closeAndCleanupAllData(); + } else { + logger.info( + "Cannot delete HA blobs because we failed to delete the pointers in the HA store."); + blobStoreService.close(); + } } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java new file mode 100644 index 0000000..a90dc61 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.Executor; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** Tests for the {@link AbstractHaServices}. */ +public class AbstractHaServicesTest extends TestLogger { + + /** + * Tests that we first delete all pointers from the HA services before deleting the blobs. See + * FLINK-22014 for more details. + */ + @Test + public void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws Exception { + final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3); + + final TestingBlobStoreService testingBlobStoreService = + new TestingBlobStoreService(closeOperations); + + final TestingHaServices haServices = + new TestingHaServices( + new Configuration(), + Executors.directExecutor(), + testingBlobStoreService, + closeOperations, + () -> closeOperations.offer(CloseOperations.HA_CLEANUP)); + + haServices.closeAndCleanupAllData(); + + assertThat( + closeOperations, + contains( + CloseOperations.HA_CLEANUP, + CloseOperations.HA_CLOSE, + CloseOperations.BLOB_CLEANUP_AND_CLOSE)); + } + + /** + * Tests that we don't delete the HA blobs if we could not clean up the pointers from the HA + * services. See FLINK-22014 for more details. + */ + @Test + public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails() { + final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3); + + final TestingBlobStoreService testingBlobStoreService = + new TestingBlobStoreService(closeOperations); + + final TestingHaServices haServices = + new TestingHaServices( + new Configuration(), + Executors.directExecutor(), + testingBlobStoreService, + closeOperations, + () -> { + throw new FlinkException("test exception"); + }); + + try { + haServices.closeAndCleanupAllData(); + fail("Expected that the close operation fails."); + } catch (Exception expected) { + + } + + assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE)); + } + + private enum CloseOperations { + HA_CLEANUP, + HA_CLOSE, + BLOB_CLEANUP_AND_CLOSE, + BLOB_CLOSE, + } + + private static final class TestingBlobStoreService implements BlobStoreService { + + private final Queue<CloseOperations> closeOperations; + + private TestingBlobStoreService(Queue<CloseOperations> closeOperations) { + this.closeOperations = closeOperations; + } + + @Override + public void closeAndCleanupAllData() { + closeOperations.offer(CloseOperations.BLOB_CLEANUP_AND_CLOSE); + } + + @Override + public void close() throws IOException { + closeOperations.offer(CloseOperations.BLOB_CLOSE); + } + + @Override + public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException { + return false; + } + + @Override + public boolean delete(JobID jobId, BlobKey blobKey) { + return false; + } + + @Override + public boolean deleteAll(JobID jobId) { + return false; + } + + @Override + public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException { + return false; + } + } + + private static final class TestingHaServices extends AbstractHaServices { + + private final Queue<? super CloseOperations> closeOperations; + private final RunnableWithException internalCleanupRunnable; + + private TestingHaServices( + Configuration config, + Executor ioExecutor, + BlobStoreService blobStoreService, + Queue<? super CloseOperations> closeOperations, + RunnableWithException internalCleanupRunnable) { + super(config, ioExecutor, blobStoreService); + this.closeOperations = closeOperations; + this.internalCleanupRunnable = internalCleanupRunnable; + } + + @Override + protected LeaderElectionService createLeaderElectionService(String leaderName) { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected LeaderRetrievalService createLeaderRetrievalService(String leaderName) { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected CheckpointRecoveryFactory createCheckpointRecoveryFactory() { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected JobGraphStore createJobGraphStore() throws Exception { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected RunningJobsRegistry createRunningJobsRegistry() { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected void internalClose() { + closeOperations.offer(CloseOperations.HA_CLOSE); + } + + @Override + protected void internalCleanup() throws Exception { + internalCleanupRunnable.run(); + } + + @Override + protected String getLeaderNameForResourceManager() { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected String getLeaderNameForDispatcher() { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected String getLeaderNameForJobManager(JobID jobID) { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + + @Override + protected String getLeaderNameForRestServer() { + throw new UnsupportedOperationException("Not supported by this test implementation."); + } + } +}