This is an automated email from the ASF dual-hosted git repository.

ritik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new af29cbe759 [ASTERIXDB-3564][STO]: Avoid halts on IO operation failures
af29cbe759 is described below

commit af29cbe759a8e9a8b865cad70fac01737f6ceb5b
Author: Savyasach Reddy <[email protected]>
AuthorDate: Thu Jan 16 17:38:06 2025 +0530

    [ASTERIXDB-3564][STO]: Avoid halts on IO operation failures
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    details:
    - Retry Flush Operations on IOException
    - Avoid Halts on Merge failure
    - Make Create Index operation retryable
    
    Ext-ref: MB-63040
    Change-Id: If253ea03f5baecbab226e527abb4267670a4233e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19187
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Ritik Raj <[email protected]>
    Tested-by: Ritik Raj <[email protected]>
---
 asterixdb/asterix-app/pom.xml                      |   4 +-
 .../org/apache/asterix/app/nc/HaltCallback.java    |  11 +-
 .../asterix/app/translator/QueryTranslator.java    |   6 +-
 .../test/cloud_storage/CloudStorageGCSTest.java    |   2 +
 ...STest.java => GCSCloudStorageUnstableTest.java} |  52 +++---
 .../cloud_storage/RetryingQueryTranslator.java     | 157 ++++++++++++++++
 .../UnstableStatementExecutorExtension.java        |  55 ++++++
 .../UnstableStatementExecutorFactory.java          |  46 +++++
 .../src/test/resources/cc-cloud-storage-gcs.conf   |   2 +
 .../asterix/cloud/CloudResettableInputStream.java  |   7 +-
 .../asterix/cloud/clients/UnstableCloudClient.java |  70 +++++++-
 .../cloud/clients/aws/s3/S3BufferedWriter.java     |   3 +-
 .../cloud/clients/google/gcs/GCSWriter.java        |  10 +-
 .../column/metadata/schema/ObjectSchemaNode.java   |   5 +-
 .../operation/lsm/flush/FlushColumnMetadata.java   |   7 +-
 .../ioopcallbacks/LSMIOOperationCallback.java      |  11 ++
 .../LSMSecondaryIndexBulkLoadNodePushable.java     |  19 +-
 .../apache/hyracks/api/exceptions/ErrorCode.java   |   1 +
 .../src/main/resources/errormsg/en.properties      |   1 +
 .../storage/am/bloomfilter/impls/BloomFilter.java  |  24 ++-
 .../storage/am/common/api/IPageManager.java        |   6 +
 .../IndexBulkLoadOperatorNodePushable.java         |  22 ++-
 .../AppendOnlyLinkedMetadataPageManager.java       |   7 +
 .../common/impls/AbstractTreeIndexBulkLoader.java  |   1 +
 .../storage/am/lsm/btree/impls/LSMBTree.java       | 122 +++++++------
 .../AbstractLSMWithBloomFilterDiskComponent.java   |   9 +-
 .../api/AbstractLSMWithBuddyDiskComponent.java     |  15 +-
 .../am/lsm/common/api/ILSMDiskComponent.java       |   5 +
 .../am/lsm/common/api/ILSMIOOperationCallback.java |   6 +
 .../lsm/common/impls/AbstractLSMDiskComponent.java |  14 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java      |   7 +-
 .../impls/ChainedLSMDiskComponentBulkLoader.java   |  17 +-
 .../am/lsm/common/impls/EmptyComponent.java        |   5 +
 .../common/impls/LSMComponentFileReferences.java   |   2 +-
 .../storage/am/lsm/common/impls/LSMHarness.java    |   3 +-
 .../impls/LSMIndexDiskComponentBulkLoader.java     |   6 +-
 .../impls/LSMInvertedComponentFileReferences.java  |  43 +++++
 .../am/lsm/common/impls/LSMTreeIndexAccessor.java  |  14 ++
 .../storage/am/lsm/common/util/ComponentUtils.java |  16 ++
 .../lsm/invertedindex/impls/LSMInvertedIndex.java  | 188 ++++++++++++--------
 .../impls/LSMInvertedIndexAccessor.java            |  14 ++
 .../impls/LSMInvertedIndexDiskComponent.java       |  27 +--
 .../impls/LSMInvertedIndexFileManager.java         |  11 +-
 .../impls/LSMInvertedIndexFlushOperation.java      |   8 +-
 .../impls/LSMInvertedIndexMergeOperation.java      |   8 +-
 .../invertedindex/ondisk/OnDiskInvertedIndex.java  |   3 +
 .../storage/am/lsm/rtree/impls/LSMRTree.java       | 112 ++++++------
 .../rtree/impls/LSMRTreeWithAntiMatterTuples.java  | 197 +++++++++++----------
 .../hyracks/storage/am/rtree/impls/RTree.java      |  23 ++-
 .../storage/common/buffercache/BufferCache.java    |  12 +-
 .../common/buffercache/FIFOLocalWriter.java        |  14 +-
 .../common/buffercache/IFIFOPageWriter.java        |   4 +-
 .../hyracks/util/ExponentialRetryPolicy.java       |  22 ++-
 53 files changed, 1055 insertions(+), 401 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 35344ab195..ab938d8384 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -486,7 +486,7 @@
           
**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           
**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           
**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
-          
**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,
+          
**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,**/GCSCloudStorageUnstableTest,
           
**/AzureBlobStorageExternalDatasetOnePartitionTest.java,**/SqlppSinglePointLookupExecutionTest.java,
           **/Atomic*.java, **/AwsS3*.java, **/*SqlppHdfs*.java, 
**/*RQGTest.java, **/*RQJTest.java
         </test.excludes>
@@ -553,7 +553,7 @@
       <id>asterix-gerrit-cloud-nons3-tests</id>
       <properties>
         <test.includes>
-          **/CloudStorageGCSTest.java, 
**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,
+          
**/GCSCloudStorageUnstableTest.java,**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,
           
**/AzureBlobStorageExternalDatasetOnePartitionTest.java,**/CloudStorageUnstableTest.java,
 **/*SqlppHdfs*.java
         </test.includes>
         <failIfNoTests>false</failIfNoTests>
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 466b617783..f66f097b4f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -47,12 +47,9 @@ public class HaltCallback implements 
IIoOperationFailedCallback {
     }
 
     private boolean haltOnFailure(ILSMIOOperation operation) {
-        switch (operation.getIOOperationType()) {
-            case CLEANUP:
-            case REPLICATE:
-                return false;
-            default:
-                return true;
-        }
+        return switch (operation.getIOOperationType()) {
+            case CLEANUP, REPLICATE, MERGE -> false;
+            default -> true;
+        };
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f7a6dc12af..fde71f8c71 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -481,7 +481,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                         if (stats.getProfileType() == Stats.ProfileType.FULL) {
                             this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
                         }
-                        handleLoadStatement(metadataProvider, stmt, hcc);
+                        handleLoadStatement(metadataProvider, stmt, hcc, 
requestParameters);
                         break;
                     case COPY_FROM:
                         if (stats.getProfileType() == Stats.ProfileType.FULL) {
@@ -4025,8 +4025,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleLoadStatement(MetadataProvider metadataProvider, 
Statement stmt, IHyracksClientConnection hcc)
-            throws Exception {
+    protected void handleLoadStatement(MetadataProvider metadataProvider, 
Statement stmt, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
         LoadStatement loadStmt = (LoadStatement) stmt;
         String datasetName = loadStmt.getDatasetName();
         metadataProvider.validateDatabaseObjectName(loadStmt.getNamespace(), 
datasetName, loadStmt.getSourceLocation());
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
index 65b5adf5e6..8c071f08a1 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -40,6 +40,7 @@ import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.MethodSorters;
@@ -57,6 +58,7 @@ import com.google.cloud.storage.StorageOptions;
  * Run tests in cloud deployment environment
  */
 @RunWith(Parameterized.class)
+@Ignore
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class CloudStorageGCSTest {
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java
similarity index 73%
copy from 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
copy to 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java
index 65b5adf5e6..c97e459fe3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java
@@ -16,27 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.asterix.test.cloud_storage;
 
 import static 
org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET;
 import static org.apache.asterix.api.common.LocalCloudUtil.MOCK_SERVER_REGION;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Objects;
-import java.util.Random;
 
 import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.test.runtime.LangExecutionUtil;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.asterix.testframework.xml.Description;
 import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
@@ -44,7 +51,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.MethodSorters;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 import com.google.cloud.NoCredentials;
 import com.google.cloud.storage.Blob;
@@ -58,7 +64,7 @@ import com.google.cloud.storage.StorageOptions;
  */
 @RunWith(Parameterized.class)
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class CloudStorageGCSTest {
+public class GCSCloudStorageUnstableTest {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -71,13 +77,14 @@ public class CloudStorageGCSTest {
     public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:24443";;
     private static final String MOCK_SERVER_PROJECT_ID = 
"asterixdb-gcs-test-project-id";
 
-    public CloudStorageGCSTest(TestCaseContext tcCtx) {
+    public GCSCloudStorageUnstableTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
     }
 
     @BeforeClass
     public static void setUp() throws Exception {
         LocalCloudUtilAdobeMock.startS3CloudEnvironment(true, true);
+        System.setProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE, 
"true");
         Storage storage = 
StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME)
                 
.setCredentials(NoCredentials.getInstance()).setProjectId(MOCK_SERVER_PROJECT_ID).build().getService();
         cleanup(storage);
@@ -92,27 +99,13 @@ public class CloudStorageGCSTest {
 
     @AfterClass
     public static void tearDown() throws Exception {
+        System.clearProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
         LangExecutionUtil.tearDown();
-        LocalCloudUtilAdobeMock.shutdownSilently();
     }
 
-    @Parameters(name = "CloudStorageGCSTest {index}: {0}")
+    @Parameterized.Parameters(name = "GCSCloudStorageUnstableTest {index}: 
{0}")
     public static Collection<Object[]> tests() throws Exception {
-        long seed = System.nanoTime();
-        Random random = new Random(seed);
-        LOGGER.info("CloudStorageGCSTest seed {}", seed);
-        Collection<Object[]> tests = LangExecutionUtil.tests(ONLY_TESTS, 
SUITE_TESTS);
-        List<Object[]> selected = new ArrayList<>();
-        for (Object[] test : tests) {
-            if (!Objects.equals(((TestCaseContext) 
test[0]).getTestGroups()[0].getName(), "sqlpp_queries")) {
-                selected.add(test);
-            }
-            // Select 10% of the tests randomly
-            else if (random.nextInt(10) == 0) {
-                selected.add(test);
-            }
-        }
-        return selected;
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
     }
 
     @Test
@@ -120,6 +113,19 @@ public class CloudStorageGCSTest {
         List<TestCase.CompilationUnit> cu = 
tcCtx.getTestCase().getCompilationUnit();
         Assume.assumeTrue(cu.size() > 1 || 
!EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
         LangExecutionUtil.test(tcCtx);
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) 
{
+            IDatasetLifecycleManager lifecycleManager =
+                    ((INcApplicationContext) 
nc.getApplicationContext()).getDatasetLifecycleManager();
+            StorageIOStats stats = lifecycleManager.getDatasetsIOStats();
+            while (stats.getPendingFlushes() != 0 || stats.getPendingMerges() 
!= 0) {
+                stats = lifecycleManager.getDatasetsIOStats();
+            }
+        }
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) 
{
+            bufferCache = ((INcApplicationContext) 
nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
     }
 
     private static String getText(Description description) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java
new file mode 100644
index 0000000000..af580b26e8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.cloud.clients.UnstableCloudClient.ERROR_RATE;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.api.IResponsePrinter;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.LoadStatement;
+import org.apache.asterix.lang.common.statement.TruncateDatasetStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.utils.Creator;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class RetryingQueryTranslator extends QueryTranslator {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public RetryingQueryTranslator(ICcApplicationContext appCtx, 
List<Statement> statements, SessionOutput output,
+            ILangCompilationProvider compilationProvider, ExecutorService 
executorService,
+            IResponsePrinter responsePrinter) {
+        super(appCtx, statements, output, compilationProvider, 
executorService, responsePrinter);
+    }
+
+    @Override
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, 
Statement stmt,
+            IHyracksClientConnection hcc, IRequestParameters 
requestParameters, Creator creator) throws Exception {
+        int times = 100;
+        Exception ex = null;
+        double initialErrorRate = ERROR_RATE.get();
+        try {
+            while (times-- > 0) {
+                try {
+                    super.handleCreateIndexStatement(metadataProvider, stmt, 
hcc, requestParameters, creator);
+                    ex = null;
+                    break;
+                } catch (Exception e) {
+                    ERROR_RATE.set(Double.max(ERROR_RATE.get() - 0.01d, 
0.01d));
+                    if (retryOnFailure(e)) {
+                        LOGGER.error("Attempt: {}, Failed to create index", 
100 - times, e);
+                        metadataProvider.getLocks().reset();
+                        ex = e;
+                    } else {
+                        throw e;
+                    }
+                }
+            }
+            if (ex != null) {
+                throw ex;
+            }
+        } finally {
+            ERROR_RATE.set(initialErrorRate);
+        }
+    }
+
+    @Override
+    public void handleAnalyzeStatement(MetadataProvider metadataProvider, 
Statement stmt, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+        int times = 100;
+        Exception ex = null;
+        while (times-- > 0) {
+            try {
+                super.handleAnalyzeStatement(metadataProvider, stmt, hcc, 
requestParameters);
+                ex = null;
+                break;
+            } catch (Exception e) {
+                if (retryOnFailure(e)) {
+                    LOGGER.error("Attempt: {}, Failed to create index", 100 - 
times, e);
+                    metadataProvider.getLocks().reset();
+                    ex = e;
+                } else {
+                    throw e;
+                }
+            }
+        }
+        if (ex != null) {
+            throw ex;
+        }
+    }
+
+    @Override
+    public void handleLoadStatement(MetadataProvider metadataProvider, 
Statement stmt, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+        int times = 100;
+        Exception ex = null;
+        double initialErrorRate = ERROR_RATE.get();
+        try {
+            while (times-- > 0) {
+                try {
+                    super.handleLoadStatement(metadataProvider, stmt, hcc, 
requestParameters);
+                    ex = null;
+                    break;
+                } catch (Exception e) {
+                    ERROR_RATE.set(Double.max(ERROR_RATE.get() - 0.01d, 
0.01d));
+                    if (retryOnFailure(e)) {
+                        LOGGER.error("Attempt: {}, Failed to load", 100 - 
times, e);
+                        metadataProvider.getLocks().reset();
+                        ex = e;
+                        LoadStatement loadStmt = (LoadStatement) stmt;
+                        TruncateDatasetStatement truncateDatasetStatement = 
new TruncateDatasetStatement(
+                                loadStmt.getNamespace(), new 
Identifier(loadStmt.getDatasetName()), true);
+                        super.handleDatasetTruncateStatement(metadataProvider, 
truncateDatasetStatement,
+                                requestParameters);
+                        metadataProvider.getLocks().reset();
+
+                    } else {
+                        throw e;
+                    }
+                }
+            }
+            if (ex != null) {
+                throw ex;
+            }
+        } finally {
+            ERROR_RATE.set(initialErrorRate);
+        }
+    }
+
+    private boolean retryOnFailure(Exception e) {
+        if (e instanceof HyracksDataException) {
+            return ((HyracksDataException) e).getErrorCode() == 
ErrorCode.FAILED_IO_OPERATION.intValue()
+                    && 
ExceptionUtils.getRootCause(e).getMessage().contains("Simulated error");
+        }
+        return false;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java
new file mode 100644
index 0000000000..bf9aecbb83
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.IServiceContext;
+
+public class UnstableStatementExecutorExtension implements 
IStatementExecutorExtension {
+
+    public static final ExtensionId RETRYING_QUERY_TRANSLATOR_EXTENSION_ID =
+            new 
ExtensionId(UnstableStatementExecutorExtension.class.getSimpleName(), 0);
+
+    @Override
+    public ExtensionId getId() {
+        return RETRYING_QUERY_TRANSLATOR_EXTENSION_ID;
+    }
+
+    @Override
+    public void configure(List<Pair<String, String>> args, IServiceContext 
serviceCtx) {
+
+    }
+
+    @Override
+    public IStatementExecutorFactory getQueryTranslatorFactory() {
+        return null;
+    }
+
+    @Override
+    public IStatementExecutorFactory 
getStatementExecutorFactory(ExecutorService executorService) {
+        return new UnstableStatementExecutorFactory(executorService);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java
new file mode 100644
index 0000000000..f65276bb2d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.api.IResponsePrinter;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.SessionOutput;
+
+public class UnstableStatementExecutorFactory extends 
DefaultStatementExecutorFactory {
+    public UnstableStatementExecutorFactory(ExecutorService executorService) {
+        super(executorService);
+    }
+
+    @Override
+    public QueryTranslator create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionOutput output,
+            ILangCompilationProvider compilationProvider, 
IStorageComponentProvider storageComponentProvider,
+            IResponsePrinter responsePrinter) {
+        return new RetryingQueryTranslator(appCtx, statements, output, 
compilationProvider, executorService,
+                responsePrinter);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf 
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
index 9b6f5477cb..376252e690 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
@@ -73,3 +73,5 @@ cloud.storage.anonymous.auth=true
 cloud.storage.cache.policy=selective
 cloud.max.write.requests.per.second=1000
 cloud.max.read.requests.per.second=5000
+
+[extension/org.apache.asterix.test.cloud_storage.UnstableStatementExecutorExtension]
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index a233ca5afb..9b832a2018 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
 import org.apache.hyracks.cloud.io.request.ICloudRequest;
@@ -158,7 +159,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
                     
CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
                 } catch (Exception e) {
                     LOGGER.error(e);
-                    throw HyracksDataException.create(e);
+                    throw 
HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e);
                 }
             }
             bufferedWriter.finish();
@@ -190,7 +191,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
         try {
             close();
         } catch (IOException e) {
-            throw HyracksDataException.create(e);
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
e);
         }
     }
 
@@ -203,7 +204,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
             CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, 
retry);
         } catch (Exception e) {
             LOGGER.error(e);
-            throw HyracksDataException.create(e);
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
e);
         }
 
         writeBuffer.clear();
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index 28fa53e5f3..1b39251334 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -25,11 +25,15 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSWriter;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -39,7 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class UnstableCloudClient implements ICloudClient {
     // 10% error rate
-    private static final double ERROR_RATE = 0.1d;
+    public static final AtomicReference<Double> ERROR_RATE = new 
AtomicReference<>(0.11d);
     private static final Random RANDOM = new Random(0);
     private final ICloudClient cloudClient;
 
@@ -61,6 +65,9 @@ public class UnstableCloudClient implements ICloudClient {
     public ICloudWriter createWriter(String bucket, String path, 
IWriteBufferProvider bufferProvider) {
         if (cloudClient instanceof S3CloudClient) {
             return createUnstableWriter((S3CloudClient) cloudClient, bucket, 
path, bufferProvider);
+        } else if (cloudClient instanceof GCSCloudClient) {
+            return new UnstableGCSCloudWriter(cloudClient.createWriter(bucket, 
path, bufferProvider),
+                    cloudClient.getWriteBufferSize());
         }
         return cloudClient.createWriter(bucket, path, bufferProvider);
     }
@@ -138,8 +145,8 @@ public class UnstableCloudClient implements ICloudClient {
 
     private static void fail() throws HyracksDataException {
         double prob = RANDOM.nextInt(100) / 100.0d;
-        if (prob <= ERROR_RATE) {
-            throw HyracksDataException.create(new IOException("Simulated 
error"));
+        if (prob < ERROR_RATE.get()) {
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
new IOException("Simulated error"));
         }
     }
 
@@ -150,6 +157,63 @@ public class UnstableCloudClient implements ICloudClient {
         return new CloudResettableInputStream(bufferedWriter, bufferProvider);
     }
 
+    /**
+     * An unstable cloud writer that mimics the functionality of {@link 
GCSWriter}
+     */
+    private static class UnstableGCSCloudWriter implements ICloudWriter {
+        private final ICloudWriter writer;
+        private final int writeBufferSize;
+
+        UnstableGCSCloudWriter(ICloudWriter writer, int writeBufferSize) {
+            this.writer = writer;
+            this.writeBufferSize = writeBufferSize;
+        }
+
+        @Override
+        public int write(ByteBuffer header, ByteBuffer page) throws 
HyracksDataException {
+            return write(header) + write(page);
+        }
+
+        @Override
+        public int write(ByteBuffer page) throws HyracksDataException {
+            if (position() == 0) {
+                fail();
+            }
+            long uploadsToBeTriggered =
+                    ((position() + page.remaining()) / writeBufferSize) - 
(position() / writeBufferSize);
+            while (uploadsToBeTriggered-- > 0) {
+                fail();
+            }
+            return writer.write(page);
+        }
+
+        @Override
+        public void write(int b) throws HyracksDataException {
+            write(ByteBuffer.wrap(new byte[] { (byte) b }));
+        }
+
+        @Override
+        public int write(byte[] b, int off, int len) throws 
HyracksDataException {
+            return write(ByteBuffer.wrap(b, off, len));
+        }
+
+        @Override
+        public long position() {
+            return writer.position();
+        }
+
+        @Override
+        public void finish() throws HyracksDataException {
+            fail();
+            writer.finish();
+        }
+
+        @Override
+        public void abort() throws HyracksDataException {
+            writer.abort();
+        }
+    }
+
     private static class UnstableCloudBufferedWriter implements 
ICloudBufferedWriter {
         private final ICloudBufferedWriter bufferedWriter;
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 53d6546919..7bd7ad5bbf 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.logging.log4j.LogManager;
@@ -131,7 +132,7 @@ public class S3BufferedWriter implements 
ICloudBufferedWriter {
         try {
             s3Client.completeMultipartUpload(request);
         } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
e);
         }
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd549a..3a837867c6 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -71,15 +72,15 @@ public class GCSWriter implements ICloudWriter {
         while (uploadsToBeTriggered-- > 0) {
             profiler.objectMultipartUpload();
         }
-        setUploadId();
 
         int written = 0;
         try {
+            setUploadId();
             while (page.hasRemaining()) {
                 written += writer.write(page);
             }
         } catch (IOException | RuntimeException e) {
-            throw HyracksDataException.create(e);
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
e);
         }
 
         writtenBytes += written;
@@ -104,13 +105,14 @@ public class GCSWriter implements ICloudWriter {
     @Override
     public void finish() throws HyracksDataException {
         guardian.checkWriteAccess(bucket, path);
-        setUploadId();
         profiler.objectMultipartUpload();
         try {
+            setUploadId();
+
             writer.close();
             writer = null;
         } catch (IOException | RuntimeException e) {
-            throw HyracksDataException.create(e);
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
e);
         }
         log("FINISHED");
     }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
index 1f74fb3c0a..4cf2d23e40 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -164,7 +164,10 @@ public final class ObjectSchemaNode extends 
AbstractSchemaNestedNode {
 
     public void abort(DataInputStream input, Map<AbstractSchemaNestedNode, 
RunLengthIntArray> definitionLevels)
             throws IOException {
-        definitionLevels.put(this, new RunLengthIntArray());
+        input.readByte(); // Skip the type tag, see ObjectSchemaNode#serialize
+        if (definitionLevels != null) {
+            definitionLevels.put(this, new RunLengthIntArray());
+        }
 
         int numberOfChildren = input.readInt();
 
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index 5aca3a4577..9c58247ca0 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -271,7 +271,8 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
 
     @Override
     public void abort() throws HyracksDataException {
-        DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(serializedMetadata.getByteArray()));
+        DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(serializedMetadata.getByteArray(),
+                serializedMetadata.getStartOffset(), 
serializedMetadata.getLength()));
         try {
             abort(input);
         } catch (IOException e) {
@@ -280,6 +281,7 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
     }
 
     private void abort(DataInputStream input) throws IOException {
+        input.skipBytes(OFFSETS_SIZE);
         level = -1;
         repeated = 0;
         changed = false;
@@ -290,6 +292,9 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
         fieldNamesDictionary.abort(input);
         definitionLevels.clear();
         root.abort(input, definitionLevels);
+        if (metaRoot != null) {
+            metaRoot.abort(input, definitionLevels);
+        }
     }
 
     public static void deserializeWriters(DataInput input, 
List<IColumnValuesWriter> writers,
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 87aa3bd3c0..52af32909a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -141,6 +141,17 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
         }
     }
 
+    @Override
+    public void afterFailure(ILSMIOOperation operation) {
+        if (isMerge(operation)) {
+            try {
+                ioManager.delete(getOperationMaskFilePath(operation));
+            } catch (HyracksDataException e) {
+                operation.getFailure().addSuppressed(e);
+            }
+        }
+    }
+
     protected void addComponentToCheckpoint(ILSMIOOperation operation) throws 
HyracksDataException {
         // will always update the checkpoint file even if no new component was 
created
         FileReference target = operation.getTarget();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 21c3b27c70..17c5445514 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -57,6 +57,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryI
 
     private LSMIndexDiskComponentBulkLoader componentBulkLoader;
     private int currentComponentPos = -1;
+    private boolean failed = false;
 
     public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int 
partition, RecordDescriptor inputRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
@@ -98,7 +99,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryI
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
         try {
-            endCurrentComponent();
+            endOrAbortCurrentComponent();
         } catch (HyracksDataException e) {
             closeException = e;
         }
@@ -151,6 +152,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends 
AbstractLSMSecondaryI
     @Override
     public void fail() throws HyracksDataException {
         writer.fail();
+        failed = true;
     }
 
     @Override
@@ -177,15 +179,22 @@ public class LSMSecondaryIndexBulkLoadNodePushable 
extends AbstractLSMSecondaryI
         }
     }
 
-    private void endCurrentComponent() throws HyracksDataException {
+    private void endOrAbortCurrentComponent() throws HyracksDataException {
         if (componentBulkLoader != null) {
-            componentBulkLoader.end();
-            componentBulkLoader = null;
+            try {
+                if (!failed) {
+                    componentBulkLoader.end();
+                } else {
+                    componentBulkLoader.abort();
+                }
+            } finally {
+                componentBulkLoader = null;
+            }
         }
     }
 
     private void loadNewComponent(int componentPos) throws 
HyracksDataException {
-        endCurrentComponent();
+        endOrAbortCurrentComponent(); // This should never call 
componentBulkLoader.abort()
         int numTuples = getNumDeletedTuples(componentPos);
         ILSMDiskComponent primaryComponent = 
primaryIndex.getDiskComponents().get(componentPos);
         Map<String, Object> parameters = new HashMap<>();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7cb107dfc5..d7abf4c057 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -162,6 +162,7 @@ public enum ErrorCode implements IError {
     EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132),
     EMPTY_TYPE_INFERRED(133),
     SCHEMA_LIMIT_EXCEEDED(134),
+    FAILED_IO_OPERATION(135),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index e1fbe3020c..e9569be66f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -152,6 +152,7 @@
 132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the 
schema
 133 = Schema could not be inferred, empty types found in the result
 134 = Schema Limit exceeded, maximum number of heterogeneous schemas allowed : 
'%1$s'
+135 = An IO Operation has failed
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index aa174741a6..abb03ebbff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -386,13 +386,15 @@ public class BloomFilter {
 
         @Override
         public void end() throws HyracksDataException {
-            allocateAndInitMetaDataPage();
-            pageWriter.write(metaDataPage);
-            for (ICachedPage p : pages) {
-                pageWriter.write(p);
-            }
-            if (hasFailed()) {
-                throw HyracksDataException.create(getFailure());
+            try {
+                allocateAndInitMetaDataPage();
+                pageWriter.write(metaDataPage);
+                for (ICachedPage p : pages) {
+                    pageWriter.write(p);
+                }
+            } catch (HyracksDataException e) {
+                handleException();
+                throw e;
             }
             BloomFilter.this.numBits = numBits;
             BloomFilter.this.numHashes = numHashes;
@@ -401,8 +403,7 @@ public class BloomFilter {
             BloomFilter.this.version = BLOCKED_BLOOM_FILTER_VERSION;
         }
 
-        @Override
-        public void abort() throws HyracksDataException {
+        private void handleException() {
             for (ICachedPage p : pages) {
                 if (p != null) {
                     bufferCache.returnPage(p, false);
@@ -413,6 +414,11 @@ public class BloomFilter {
             }
         }
 
+        @Override
+        public void abort() throws HyracksDataException {
+            handleException();
+        }
+
         @Override
         public void force() throws HyracksDataException {
             bufferCache.force(fileId, false);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
index 208d3b701e..ff28464a93 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
@@ -48,6 +48,12 @@ public interface IPageManager {
      */
     void close(IPageWriteFailureCallback failureCallback) throws 
HyracksDataException;
 
+    /**
+     * Return all the pages from the page manager
+     */
+    default void returnAllPages() {
+    }
+
     /**
      * Create a metadata frame to be used for reading and writing to metadata 
pages
      *
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 0206f201d0..d606a5c649 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -61,6 +61,7 @@ public class IndexBulkLoadOperatorNodePushable extends 
AbstractUnaryInputUnaryOu
     protected final IIndexBulkLoader[] bulkLoaders;
     protected ITupleFilter tupleFilter;
     protected FrameTupleReference frameTuple;
+    private boolean failed = false;
 
     public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory 
indexHelperFactory, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, float fillFactor, boolean 
verifyInput, long numElementsHint,
@@ -151,6 +152,7 @@ public class IndexBulkLoadOperatorNodePushable extends 
AbstractUnaryInputUnaryOu
     @Override
     public void fail() throws HyracksDataException {
         writer.fail();
+        failed = true;
     }
 
     protected void initializeBulkLoader(IIndex index, int indexId) throws 
HyracksDataException {
@@ -159,12 +161,28 @@ public class IndexBulkLoadOperatorNodePushable extends 
AbstractUnaryInputUnaryOu
     }
 
     private void closeBulkLoaders() throws HyracksDataException {
+        HyracksDataException failure = null;
         for (IIndexBulkLoader bulkLoader : bulkLoaders) {
             // bulkloader can be null if an exception is thrown before it is 
initialized.
-            if (bulkLoader != null) {
-                bulkLoader.end();
+            try {
+                if (bulkLoader != null) {
+                    if (failure == null && !failed) {
+                        bulkLoader.end();
+                    } else {
+                        bulkLoader.abort();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                if (failure == null) {
+                    failure = e;
+                } else {
+                    failure.addSuppressed(e);
+                }
             }
         }
+        if (failure != null) {
+            throw failure;
+        }
     }
 
     protected static void closeIndexes(IIndex[] indexes, 
IIndexDataflowHelper[] indexHelpers,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index dae01bfe88..9c51d92202 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -152,6 +152,13 @@ public class AppendOnlyLinkedMetadataPageManager 
implements IMetadataPageManager
         metadataPages.clear();
     }
 
+    @Override
+    public void returnAllPages() {
+        for (ICachedPage page : metadataPages) {
+            bufferCache.returnPage(page, false);
+        }
+    }
+
     /**
      * For storage on append-only media (such as HDFS), the meta data page has 
to be written last.
      * However, some implementations still write the meta data to the front. 
To deal with this as well
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
index d5f836c51b..5ef3aab8b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
@@ -130,6 +130,7 @@ public abstract class AbstractTreeIndexBulkLoader extends 
PageWriteFailureCallba
             }
         }
         releasedLatches = true;
+        freePageManager.returnAllPages();
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 05e78dd44d..3526d2f144 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -238,73 +238,83 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
         LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
         LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) 
flushOp.getFlushingComponent();
         IIndexAccessor accessor = 
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        ILSMDiskComponent component;
-        ILSMDiskComponentBulkLoader componentBulkLoader;
+        ILSMDiskComponent component = null;
+        ILSMDiskComponentBulkLoader componentBulkLoader = null;
         try {
-            RangePredicate nullPred = new RangePredicate(null, null, true, 
true, null, null);
-            long numElements = 0L;
-            if (hasBloomFilter) {
-                //count elements in btree for creating Bloomfilter
-                IIndexCursor countingCursor = ((BTreeAccessor) 
accessor).createCountingSearchCursor();
-                accessor.search(countingCursor, nullPred);
+            try {
+                RangePredicate nullPred = new RangePredicate(null, null, true, 
true, null, null);
+                long numElements = 0L;
+                if (hasBloomFilter) {
+                    //count elements in btree for creating Bloomfilter
+                    IIndexCursor countingCursor = ((BTreeAccessor) 
accessor).createCountingSearchCursor();
+                    accessor.search(countingCursor, nullPred);
+                    try {
+                        while (countingCursor.hasNext()) {
+                            countingCursor.next();
+                            ITupleReference countTuple = 
countingCursor.getTuple();
+                            numElements = 
IntegerPointable.getInteger(countTuple.getFieldData(0),
+                                    countTuple.getFieldStart(0));
+                        }
+                    } finally {
+                        try {
+                            countingCursor.close();
+                        } finally {
+                            countingCursor.destroy();
+                        }
+                    }
+                }
+                component = createDiskComponent(componentFactory, 
flushOp.getTarget(), null,
+                        flushOp.getBloomFilterTarget(), true);
+                componentBulkLoader = component.createBulkLoader(operation, 
1.0f, false, numElements, false, false,
+                        false, 
pageWriteCallbackFactory.createPageWriteCallback());
+                IIndexCursor scanCursor = accessor.createSearchCursor(false);
+                accessor.search(scanCursor, nullPred);
                 try {
-                    while (countingCursor.hasNext()) {
-                        countingCursor.next();
-                        ITupleReference countTuple = countingCursor.getTuple();
-                        numElements =
-                                
IntegerPointable.getInteger(countTuple.getFieldData(0), 
countTuple.getFieldStart(0));
+                    while (scanCursor.hasNext()) {
+                        scanCursor.next();
+                        // we can safely throw away updated tuples in 
secondary BTree components, because they correspond to
+                        // deleted tuples
+                        if (updateAware && ((LSMBTreeTupleReference) 
scanCursor.getTuple()).isUpdated()) {
+                            continue;
+                        }
+                        componentBulkLoader.add(scanCursor.getTuple());
                     }
                 } finally {
                     try {
-                        countingCursor.close();
+                        scanCursor.close();
                     } finally {
-                        countingCursor.destroy();
+                        scanCursor.destroy();
                     }
                 }
+            } finally {
+                accessor.destroy();
             }
-            component = createDiskComponent(componentFactory, 
flushOp.getTarget(), null, flushOp.getBloomFilterTarget(),
-                    true);
-            componentBulkLoader = component.createBulkLoader(operation, 1.0f, 
false, numElements, false, false, false,
-                    pageWriteCallbackFactory.createPageWriteCallback());
-            IIndexCursor scanCursor = accessor.createSearchCursor(false);
-            accessor.search(scanCursor, nullPred);
+            if (component.getLSMComponentFilter() != null) {
+                List<ITupleReference> filterTuples = new ArrayList<>();
+                
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+                
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+                
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+                        NoOpOperationCallback.INSTANCE);
+                
getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
+            }
+            // Write metadata from memory component to disk
+            // Q. what about the merge operation? how do we resolve conflicts
+            // A. Through providing an appropriate ILSMIOOperationCallback
+            // Must not reset the metadata before the flush is completed
+            // Use the copy of the metadata in the opContext
+            // TODO This code should be in the callback and not in the index
+            flushingComponent.getMetadata().copy(component.getMetadata());
+            componentBulkLoader.end();
+        } catch (Throwable e) {
             try {
-                while (scanCursor.hasNext()) {
-                    scanCursor.next();
-                    // we can safely throw away updated tuples in secondary 
BTree components, because they correspond to
-                    // deleted tuples
-                    if (updateAware && ((LSMBTreeTupleReference) 
scanCursor.getTuple()).isUpdated()) {
-                        continue;
-                    }
-                    componentBulkLoader.add(scanCursor.getTuple());
-                }
-            } finally {
-                try {
-                    scanCursor.close();
-                } finally {
-                    scanCursor.destroy();
+                if (componentBulkLoader != null) {
+                    componentBulkLoader.abort();
                 }
+            } catch (Throwable th) {
+                e.addSuppressed(th);
             }
-        } finally {
-            accessor.destroy();
-        }
-        if (component.getLSMComponentFilter() != null) {
-            List<ITupleReference> filterTuples = new ArrayList<>();
-            
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-            
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-            getFilterManager().updateFilter(component.getLSMComponentFilter(), 
filterTuples,
-                    NoOpOperationCallback.INSTANCE);
-            getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
+            throw e;
         }
-        // Write metadata from memory component to disk
-        // Q. what about the merge operation? how do we resolve conflicts
-        // A. Through providing an appropriate ILSMIOOperationCallback
-        // Must not reset the metadata before the flush is completed
-        // Use the copy of the metadata in the opContext
-        // TODO This code should be in the callback and not in the index
-        flushingComponent.getMetadata().copy(component.getMetadata());
-
-        componentBulkLoader.end();
 
         return component;
     }
@@ -313,7 +323,7 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
     public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws 
HyracksDataException {
         LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
         IIndexCursor cursor = mergeOp.getCursor();
-        ILSMDiskComponent mergedComponent;
+        ILSMDiskComponent mergedComponent = null;
         ILSMDiskComponentBulkLoader componentBulkLoader = null;
         try {
             try {
@@ -349,6 +359,7 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
                 
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
                         mergedComponent.getMetadataHolder());
             }
+            componentBulkLoader.end();
         } catch (Throwable e) { // NOSONAR.. As per the contract, we should 
either abort or end
             try {
                 if (componentBulkLoader != null) {
@@ -359,7 +370,6 @@ public class LSMBTree extends AbstractLSMIndex implements 
ITreeIndex {
             }
             throw e;
         }
-        componentBulkLoader.end();
         return mergedComponent;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index db36c72a99..c77d7ff884 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -47,8 +47,13 @@ public abstract class 
AbstractLSMWithBloomFilterDiskComponent extends AbstractLS
     public void markAsValid(boolean persist, IPageWriteFailureCallback 
callback) throws HyracksDataException {
         // The order of forcing the dirty page to be flushed is critical. The
         // bloom filter must be always done first.
-        ComponentUtils.markAsValid(getBloomFilterBufferCache(), 
getBloomFilter(), persist);
-        super.markAsValid(persist, callback);
+        try {
+            ComponentUtils.markAsValid(getBloomFilterBufferCache(), 
getBloomFilter(), persist);
+            super.markAsValid(persist, callback);
+        } catch (HyracksDataException ex) {
+            returnPages();
+            throw ex;
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
index fda48f04c3..158a3b8871 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
@@ -40,8 +40,19 @@ public abstract class AbstractLSMWithBuddyDiskComponent 
extends AbstractLSMWithB
 
     @Override
     public void markAsValid(boolean persist, IPageWriteFailureCallback 
callback) throws HyracksDataException {
-        super.markAsValid(persist, callback);
-        ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+        try {
+            super.markAsValid(persist, callback);
+            ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+        } catch (HyracksDataException ex) {
+            returnPages();
+            throw ex;
+        }
+    }
+
+    @Override
+    public void returnPages() {
+        getBuddyIndex().getPageManager().returnAllPages();
+        super.returnPages();
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 029a59de63..e41a17efa7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -137,4 +137,9 @@ public interface ILSMDiskComponent extends ILSMComponent {
     ILSMDiskComponentBulkLoader createBulkLoader(ILSMIOOperation operation, 
float fillFactor, boolean verifyInput,
             long numElementsHint, boolean checkIfEmptyIndex, boolean 
withFilter, boolean cleanupEmptyComponent,
             IPageWriteCallback callback) throws HyracksDataException;
+
+    /**
+     * Returns all pages of the component to the buffer cache
+     */
+    void returnPages();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index a778a4c1ee..542cf47b6a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -58,6 +58,12 @@ public interface ILSMIOOperationCallback {
      */
     void afterFinalize(ILSMIOOperation operation) throws HyracksDataException;
 
+    /**
+     * This method is called on an IO operation after the operation fails
+     */
+    default void afterFailure(ILSMIOOperation operation) {
+    }
+
     /**
      * This method is called after the schduler is done with the IO operation
      * For operation that are not scheduled, this call is skipped
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index baf0931319..c7240f34b3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -158,8 +158,18 @@ public abstract class AbstractLSMDiskComponent extends 
AbstractLSMComponent impl
      */
     @Override
     public void markAsValid(boolean persist, IPageWriteFailureCallback 
callback) throws HyracksDataException {
-        ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
-        LOGGER.debug("marked {} as valid component with id {}", getIndex(), 
getId());
+        try {
+            ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
+            LOGGER.debug("marked {} as valid component with id {}", 
getIndex(), getId());
+        } catch (Exception e) {
+            returnPages();
+            throw e;
+        }
+    }
+
+    @Override
+    public void returnPages() {
+        ComponentUtils.returnPages(getMetadataHolder());
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 4f7c624996..3f60978c83 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -583,7 +583,12 @@ public abstract class AbstractLSMIndex implements 
ILSMIndex {
             throws HyracksDataException {
         ILSMDiskComponent component = factory.createComponent(this,
                 new LSMComponentFileReferences(insertFileReference, 
deleteIndexFileReference, bloomFilterFileRef));
-        component.activate(createComponent);
+        try {
+            component.activate(createComponent);
+        } catch (HyracksDataException e) {
+            component.returnPages();
+            throw e;
+        }
         return component;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index 0214712c27..f90da79d53 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -127,9 +127,20 @@ public class ChainedLSMDiskComponentBulkLoader implements 
ILSMDiskComponentBulkL
     @Override
     public void abort() throws HyracksDataException {
         operation.setStatus(LSMIOOperationStatus.FAILURE);
-        final int bulkloadersCount = bulkloaderChain.size();
-        for (int i = 0; i < bulkloadersCount; i++) {
-            bulkloaderChain.get(i).abort();
+        HyracksDataException failure = null;
+        for (IChainedComponentBulkLoader componentBulkLoader : 
bulkloaderChain) {
+            try {
+                componentBulkLoader.abort();
+            } catch (HyracksDataException e) {
+                if (failure == null) {
+                    failure = e;
+                } else {
+                    failure.addSuppressed(e);
+                }
+            }
+        }
+        if (failure != null) {
+            throw failure;
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 9a112ee4d5..0dfb12ef27 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -157,4 +157,9 @@ public class EmptyComponent implements ILSMDiskComponent {
     public String toString() {
         return "EmptyComponent";
     }
+
+    @Override
+    public void returnPages() {
+        // Do nothing
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
index 9346b56a80..392872477c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import org.apache.hyracks.api.io.FileReference;
 
-public final class LSMComponentFileReferences {
+public class LSMComponentFileReferences {
 
     // The FileReference for the index that is used for inserting records of 
the component. For instance, this will be the FileReference of the RTree in one 
component of the LSM-RTree.
     private final FileReference insertIndexFileReference;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index d019a088a1..69c6ea63b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -266,7 +266,7 @@ public class LSMHarness implements ILSMHarness {
             if (inactiveMemoryComponentsToBeCleanedUp != null) {
                 
cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp);
             }
-            if (opType == LSMOperationType.FLUSH) {
+            if (opType == LSMOperationType.FLUSH && !failedOperation) {
                 ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) 
ctx.getComponentHolder().get(0);
                 // We must call flushed without synchronizing on opTracker to 
avoid deadlocks
                 flushingComponent.flushed();
@@ -575,6 +575,7 @@ public class LSMHarness implements ILSMHarness {
         // if the operation failed, we need to cleanup files
         if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
             operation.cleanup(lsmIndex.getBufferCache());
+            operation.getCallback().afterFailure(operation);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 3e17cb040d..88fa10546d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -53,7 +53,7 @@ public class LSMIndexDiskComponentBulkLoader implements 
IIndexBulkLoader {
         try {
             componentBulkLoader.add(tuple);
         } catch (Throwable th) {
-            opCtx.getIoOperation().setFailure(th);
+            fail(th);
             throw th;
         }
     }
@@ -63,7 +63,7 @@ public class LSMIndexDiskComponentBulkLoader implements 
IIndexBulkLoader {
         try {
             componentBulkLoader.delete(tuple);
         } catch (Throwable th) {
-            opCtx.getIoOperation().setFailure(th);
+            fail(th);
             throw th;
         }
     }
@@ -83,6 +83,7 @@ public class LSMIndexDiskComponentBulkLoader implements 
IIndexBulkLoader {
     @Override
     public void abort() throws HyracksDataException {
         opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
+        fail(null);
         try {
             try {
                 componentBulkLoader.abort();
@@ -115,6 +116,7 @@ public class LSMIndexDiskComponentBulkLoader implements 
IIndexBulkLoader {
             componentBulkLoader.end();
         } catch (Throwable th) { // NOSONAR Must not call afterFinalize 
without setting failure
             fail(th);
+            componentBulkLoader.abort();
             throw th;
         } finally {
             
lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java
new file mode 100644
index 0000000000..7d10c13b20
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class LSMInvertedComponentFileReferences extends 
LSMComponentFileReferences {
+
+    private final FileReference invListsFileReference;
+
+    public LSMInvertedComponentFileReferences(FileReference 
insertIndexFileReference,
+            FileReference deleteIndexFileReference, FileReference 
bloomFilterFileReference,
+            FileReference invListsFileReference) {
+        super(insertIndexFileReference, deleteIndexFileReference, 
bloomFilterFileReference);
+        this.invListsFileReference = invListsFileReference;
+    }
+
+    public FileReference getInvListsFileReference() {
+        return invListsFileReference;
+    }
+
+    @Override
+    public FileReference[] getFileReferences() {
+        return new FileReference[] { getInsertIndexFileReference(), 
getDeleteIndexFileReference(),
+                getBloomFilterFileReference(), invListsFileReference };
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index c768768c0a..52bd07ed8b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -41,6 +41,8 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
 
 public class LSMTreeIndexAccessor implements ILSMIndexAccessor {
     @FunctionalInterface
@@ -122,6 +124,18 @@ public class LSMTreeIndexAccessor implements 
ILSMIndexAccessor {
     @Override
     public void flush(ILSMIOOperation operation) throws HyracksDataException {
         lsmHarness.flush(operation);
+        if (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+            IRetryPolicy policy = new ExponentialRetryPolicy();
+            while (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+                if (policy.retry(operation.getFailure())) {
+                    operation.setFailure(null);
+                    
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                    lsmHarness.flush(operation);
+                } else {
+                    break;
+                }
+            }
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 842ec61527..a8cbfdd400 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -169,6 +169,22 @@ public class ComponentUtils {
         }
     }
 
+    public static void returnPages(ITreeIndex treeIndex) {
+        treeIndex.getPageManager().returnAllPages();
+        IBufferCache bufferCache = treeIndex.getBufferCache();
+        // We need to return all pages to the buffer cache in case of a failure
+        try {
+            bufferCache.getCompressedPageWriter(treeIndex.getFileId()).abort();
+        } catch (IllegalStateException | NullPointerException ignored) {
+            // Since we call this method in multiple places, it is possible 
that the writer
+            // is not in the State.WRITABLE, which would throw an 
IllegalStateException.
+            // This means the writer has already written all the pages.
+            //
+            // We also catch NullPointerException in case the writer is not 
initialized.
+            // Or if the compressed page writer is not applicable to this case
+        }
+    }
+
     public static void markAsValid(IBufferCache bufferCache, BloomFilter 
filter, boolean forceToDisk)
             throws HyracksDataException {
         if (forceToDisk) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index a828d229e1..f98a05629e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
 
+import static 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER;
+import static 
org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager.INVLISTS_SUFFIX;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -286,46 +289,56 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
         // Create a scan cursor on the deleted keys BTree underlying the 
in-memory inverted index.
         IIndexCursor deletedKeysScanCursor = 
deletedKeysBTreeAccessor.createSearchCursor(false);
         try {
-            deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
             try {
-                while (deletedKeysScanCursor.hasNext()) {
-                    deletedKeysScanCursor.next();
-                    
componentBulkLoader.delete(deletedKeysScanCursor.getTuple());
+                deletedKeysBTreeAccessor.search(deletedKeysScanCursor, 
nullPred);
+                try {
+                    while (deletedKeysScanCursor.hasNext()) {
+                        deletedKeysScanCursor.next();
+                        
componentBulkLoader.delete(deletedKeysScanCursor.getTuple());
+                    }
+                } finally {
+                    deletedKeysScanCursor.close();
                 }
             } finally {
-                deletedKeysScanCursor.close();
+                deletedKeysScanCursor.destroy();
             }
-        } finally {
-            deletedKeysScanCursor.destroy();
-        }
-        // Scan the in-memory inverted index
-        InMemoryInvertedIndexAccessor memInvIndexAccessor =
-                
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        BTreeAccessor memBTreeAccessor = 
memInvIndexAccessor.getBTreeAccessor();
-        IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
-        try {
-            memBTreeAccessor.search(scanCursor, nullPred);
-            // Bulk load the disk inverted index from the in-memory inverted 
index.
+            // Scan the in-memory inverted index
+            InMemoryInvertedIndexAccessor memInvIndexAccessor =
+                    
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            BTreeAccessor memBTreeAccessor = 
memInvIndexAccessor.getBTreeAccessor();
+            IIndexCursor scanCursor = 
memBTreeAccessor.createSearchCursor(false);
             try {
-                while (scanCursor.hasNext()) {
-                    scanCursor.next();
-                    componentBulkLoader.add(scanCursor.getTuple());
+                memBTreeAccessor.search(scanCursor, nullPred);
+                // Bulk load the disk inverted index from the in-memory 
inverted index.
+                try {
+                    while (scanCursor.hasNext()) {
+                        scanCursor.next();
+                        componentBulkLoader.add(scanCursor.getTuple());
+                    }
+                } finally {
+                    scanCursor.close();
                 }
             } finally {
-                scanCursor.close();
+                scanCursor.destroy();
             }
-        } finally {
-            scanCursor.destroy();
-        }
-        if (component.getLSMComponentFilter() != null) {
-            List<ITupleReference> filterTuples = new ArrayList<>();
-            
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-            
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-            filterManager.updateFilter(component.getLSMComponentFilter(), 
filterTuples, NoOpOperationCallback.INSTANCE);
-            filterManager.writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
+            if (component.getLSMComponentFilter() != null) {
+                List<ITupleReference> filterTuples = new ArrayList<>();
+                
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+                
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+                filterManager.updateFilter(component.getLSMComponentFilter(), 
filterTuples,
+                        NoOpOperationCallback.INSTANCE);
+                filterManager.writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
+            }
+            flushingComponent.getMetadata().copy(component.getMetadata());
+            componentBulkLoader.end();
+        } catch (Throwable e) {
+            try {
+                componentBulkLoader.abort();
+            } catch (Throwable th) {
+                e.addSuppressed(th);
+            }
+            throw e;
         }
-        flushingComponent.getMetadata().copy(component.getMetadata());
-        componentBulkLoader.end();
         return component;
     }
 
@@ -339,60 +352,71 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
         // Create an inverted index instance.
         ILSMDiskComponent component = createDiskComponent(componentFactory, 
mergeOp.getTarget(),
                 mergeOp.getDeletedKeysBTreeTarget(), 
mergeOp.getBloomFilterTarget(), true);
-        ILSMDiskComponentBulkLoader componentBulkLoader;
+        ILSMDiskComponentBulkLoader componentBulkLoader = null;
         // In case we must keep the deleted-keys BTrees, then they must be 
merged *before* merging the inverted
         // indexes so that lsmHarness.endSearch() is called once when the 
inverted indexes have been merged.
-        if 
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) 
!= diskComponents
-                .get(diskComponents.size() - 1)) {
-            // Keep the deleted tuples since the oldest disk component is not 
included in the merge operation
-            LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor =
-                    new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx, 
mergeOp.getCursorStats());
-            try {
-                long numElements = 0L;
-                for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
-                    numElements += ((LSMInvertedIndexDiskComponent) 
mergeOp.getMergingComponents().get(i))
-                            .getBloomFilter().getNumElements();
-                }
-                componentBulkLoader = component.createBulkLoader(operation, 
1.0f, false, numElements, false, false,
-                        false, 
pageWriteCallbackFactory.createPageWriteCallback());
-                loadDeleteTuples(opCtx, btreeCursor, mergePred, 
componentBulkLoader);
-            } finally {
-                btreeCursor.destroy();
-            }
-        } else {
-            componentBulkLoader = component.createBulkLoader(operation, 1.0f, 
false, 0L, false, false, false,
-                    pageWriteCallbackFactory.createPageWriteCallback());
-        }
-        search(opCtx, cursor, mergePred);
         try {
-            while (cursor.hasNext()) {
-                cursor.next();
-                componentBulkLoader.add(cursor.getTuple());
+            if 
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) 
!= diskComponents
+                    .get(diskComponents.size() - 1)) {
+                // Keep the deleted tuples since the oldest disk component is 
not included in the merge operation
+                LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor =
+                        new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx, 
mergeOp.getCursorStats());
+                try {
+                    long numElements = 0L;
+                    for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
+                        numElements += ((LSMInvertedIndexDiskComponent) 
mergeOp.getMergingComponents().get(i))
+                                .getBloomFilter().getNumElements();
+                    }
+                    componentBulkLoader = 
component.createBulkLoader(operation, 1.0f, false, numElements, false, false,
+                            false, 
pageWriteCallbackFactory.createPageWriteCallback());
+                    loadDeleteTuples(opCtx, btreeCursor, mergePred, 
componentBulkLoader);
+                } finally {
+                    btreeCursor.destroy();
+                }
+            } else {
+                componentBulkLoader = component.createBulkLoader(operation, 
1.0f, false, 0L, false, false, false,
+                        pageWriteCallbackFactory.createPageWriteCallback());
             }
-        } finally {
+            search(opCtx, cursor, mergePred);
             try {
-                cursor.close();
+                while (cursor.hasNext()) {
+                    cursor.next();
+                    componentBulkLoader.add(cursor.getTuple());
+                }
             } finally {
-                cursor.destroy();
+                try {
+                    cursor.close();
+                } finally {
+                    cursor.destroy();
+                }
             }
-        }
-        if (component.getLSMComponentFilter() != null) {
-            List<ITupleReference> filterTuples = new ArrayList<>();
-            for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
-                ITupleReference min = 
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple();
-                ITupleReference max = 
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple();
-                if (min != null) {
-                    filterTuples.add(min);
+            if (component.getLSMComponentFilter() != null) {
+                List<ITupleReference> filterTuples = new ArrayList<>();
+                for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
+                    ITupleReference min = 
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple();
+                    ITupleReference max = 
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple();
+                    if (min != null) {
+                        filterTuples.add(min);
+                    }
+                    if (max != null) {
+                        filterTuples.add(max);
+                    }
                 }
-                if (max != null) {
-                    filterTuples.add(max);
+                
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+                        NoOpOperationCallback.INSTANCE);
+                
getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
+            }
+            componentBulkLoader.end();
+        } catch (Throwable e) {
+            try {
+                if (componentBulkLoader != null) {
+                    componentBulkLoader.abort();
                 }
+            } catch (Throwable th) {
+                e.addSuppressed(th);
             }
-            getFilterManager().updateFilter(component.getLSMComponentFilter(), 
filterTuples,
-                    NoOpOperationCallback.INSTANCE);
-            getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
+            throw e;
         }
-        componentBulkLoader.end();
 
         return component;
     }
@@ -487,7 +511,10 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
             throws HyracksDataException {
         return new LSMInvertedIndexFlushOperation(new 
LSMInvertedIndexAccessor(getHarness(), opCtx),
                 componentFileRefs.getInsertIndexFileReference(), 
componentFileRefs.getDeleteIndexFileReference(),
-                componentFileRefs.getBloomFilterFileReference(), callback, 
getIndexIdentifier());
+                componentFileRefs.getBloomFilterFileReference(),
+                ioManager.resolve(
+                        
getInvListsFilePath(componentFileRefs.getInsertIndexFileReference().getAbsolutePath())),
+                callback, getIndexIdentifier());
     }
 
     @Override
@@ -497,7 +524,14 @@ public class LSMInvertedIndex extends AbstractLSMIndex 
implements IInvertedIndex
         IIndexCursorStats stats = new IndexCursorStats();
         IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx, stats);
         return new LSMInvertedIndexMergeOperation(accessor, cursor, stats, 
mergeFileRefs.getInsertIndexFileReference(),
-                mergeFileRefs.getDeleteIndexFileReference(), 
mergeFileRefs.getBloomFilterFileReference(), callback,
-                getIndexIdentifier());
+                mergeFileRefs.getDeleteIndexFileReference(), 
mergeFileRefs.getBloomFilterFileReference(),
+                
ioManager.resolve(getInvListsFilePath(mergeFileRefs.getInsertIndexFileReference().getAbsolutePath())),
+                callback, getIndexIdentifier());
+    }
+
+    public String getInvListsFilePath(String dictBTreeFilePath) {
+        int index = dictBTreeFilePath.lastIndexOf(DELIMITER);
+        String file = dictBTreeFilePath.substring(0, index);
+        return file + DELIMITER + INVLISTS_SUFFIX;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 67312ceb23..d8900ad2af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -37,6 +37,8 @@ import 
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccesso
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
 
 public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, 
IInvertedIndexAccessor {
 
@@ -93,6 +95,18 @@ public class LSMInvertedIndexAccessor implements 
ILSMIndexAccessor, IInvertedInd
     @Override
     public void flush(ILSMIOOperation operation) throws HyracksDataException {
         lsmHarness.flush(operation);
+        if (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+            IRetryPolicy policy = new ExponentialRetryPolicy();
+            while (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+                if (policy.retry(operation.getFailure())) {
+                    operation.setFailure(null);
+                    
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                    lsmHarness.flush(operation);
+                } else {
+                    break;
+                }
+            }
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 6de6ab5b64..3edb5feead 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -105,17 +105,22 @@ public class LSMInvertedIndexDiskComponent extends 
AbstractLSMWithBuddyDiskCompo
 
     @Override
     public void markAsValid(boolean persist, IPageWriteFailureCallback 
callback) throws HyracksDataException {
-        ComponentUtils.markAsValid(getBloomFilterBufferCache(), 
getBloomFilter(), persist);
-
-        // Flush inverted index second.
-        invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true);
-        ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
-        if (!callback.hasFailed()) {
-            // Flush deleted keys BTree.
-            ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
-        }
-        if (callback.hasFailed()) {
-            throw HyracksDataException.create(callback.getFailure());
+        try {
+            ComponentUtils.markAsValid(getBloomFilterBufferCache(), 
getBloomFilter(), persist);
+
+            // Flush inverted index second.
+            invIndex.getBufferCache().force((invIndex).getInvListsFileId(), 
true);
+            ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
+            if (!callback.hasFailed()) {
+                // Flush deleted keys BTree.
+                ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+            }
+            if (callback.hasFailed()) {
+                throw HyracksDataException.create(callback.getFailure());
+            }
+        } catch (HyracksDataException ex) {
+            returnPages();
+            throw ex;
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 3b8f82a0a0..442df0b39a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -34,6 +34,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage
 import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
 
 // TODO: Refactor for better code sharing with other file managers.
@@ -59,17 +60,19 @@ public class LSMInvertedIndexFileManager extends 
AbstractLSMIndexFileManager imp
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws 
HyracksDataException {
         String baseName = getNextComponentSequence(deletedKeysBTreeFilter);
-        return new LSMComponentFileReferences(baseDir.getChild(baseName + 
DELIMITER + DICT_BTREE_SUFFIX),
+        return new 
LSMInvertedComponentFileReferences(baseDir.getChild(baseName + DELIMITER + 
DICT_BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + 
DELETED_KEYS_BTREE_SUFFIX),
-                baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
+                baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX),
+                baseDir.getChild(baseName + DELIMITER + INVLISTS_SUFFIX));
     }
 
     @Override
     public LSMComponentFileReferences getRelMergeFileReference(String 
firstFileName, String lastFileName) {
         final String baseName = 
IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
-        return new LSMComponentFileReferences(baseDir.getChild(baseName + 
DELIMITER + DICT_BTREE_SUFFIX),
+        return new 
LSMInvertedComponentFileReferences(baseDir.getChild(baseName + DELIMITER + 
DICT_BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + 
DELETED_KEYS_BTREE_SUFFIX),
-                baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
+                baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX),
+                baseDir.getChild(baseName + DELIMITER + INVLISTS_SUFFIX));
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 5b272eeea4..efd5060f91 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -24,17 +24,20 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences;
 
 public class LSMInvertedIndexFlushOperation extends FlushOperation {
     private final FileReference deletedKeysBTreeFlushTarget;
     private final FileReference bloomFilterFlushTarget;
+    private final FileReference invListsFlushTarget;
 
     public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, 
FileReference flushTarget,
             FileReference deletedKeysBTreeFlushTarget, FileReference 
bloomFilterFlushTarget,
-            ILSMIOOperationCallback callback, String indexIdentifier) {
+            FileReference invListsFlushTarget, ILSMIOOperationCallback 
callback, String indexIdentifier) {
         super(accessor, flushTarget, callback, indexIdentifier);
         this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
         this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+        this.invListsFlushTarget = invListsFlushTarget;
     }
 
     public FileReference getDeletedKeysBTreeTarget() {
@@ -47,6 +50,7 @@ public class LSMInvertedIndexFlushOperation extends 
FlushOperation {
 
     @Override
     public LSMComponentFileReferences getComponentFiles() {
-        return new LSMComponentFileReferences(target, 
deletedKeysBTreeFlushTarget, bloomFilterFlushTarget);
+        return new LSMInvertedComponentFileReferences(target, 
deletedKeysBTreeFlushTarget, bloomFilterFlushTarget,
+                invListsFlushTarget);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 31c79f0a62..4ebb9d3cc1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IIndexCursorStats;
@@ -30,13 +31,15 @@ import org.apache.hyracks.storage.common.IIndexCursorStats;
 public class LSMInvertedIndexMergeOperation extends MergeOperation {
     private final FileReference deletedKeysBTreeMergeTarget;
     private final FileReference bloomFilterMergeTarget;
+    private final FileReference invListsMergeTarget;
 
     public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, 
IIndexCursor cursor, IIndexCursorStats stats,
             FileReference target, FileReference deletedKeysBTreeMergeTarget, 
FileReference bloomFilterMergeTarget,
-            ILSMIOOperationCallback callback, String indexIdentifier) {
+            FileReference invListsMergeTarget, ILSMIOOperationCallback 
callback, String indexIdentifier) {
         super(accessor, target, callback, indexIdentifier, cursor, stats);
         this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
         this.bloomFilterMergeTarget = bloomFilterMergeTarget;
+        this.invListsMergeTarget = invListsMergeTarget;
     }
 
     public FileReference getDeletedKeysBTreeTarget() {
@@ -49,7 +52,8 @@ public class LSMInvertedIndexMergeOperation extends 
MergeOperation {
 
     @Override
     public LSMComponentFileReferences getComponentFiles() {
-        return new LSMComponentFileReferences(target, 
deletedKeysBTreeMergeTarget, bloomFilterMergeTarget);
+        return new LSMInvertedComponentFileReferences(target, 
deletedKeysBTreeMergeTarget, bloomFilterMergeTarget,
+                invListsMergeTarget);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 2583d0ab69..47068a30fe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -397,6 +397,9 @@ public class OnDiskInvertedIndex implements 
IInPlaceInvertedIndex {
             if (btreeBulkloader != null) {
                 btreeBulkloader.abort();
             }
+            if (currentPage != null) {
+                bufferCache.returnPage(currentPage, false);
+            }
         }
 
         @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a8a190988e..8b88e7ee51 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -110,7 +110,6 @@ public class LSMRTree extends AbstractLSMRTree {
         RangePredicate btreeNullPredicate = new RangePredicate(null, null, 
true, true, null, null);
         BTreeAccessor memBTreeAccessor =
                 
flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        boolean abort = true;
         try {
             try {
                 rTreeTupleSorter = getRTreeTupleSorter(flushingComponent, 
memBTreeAccessor, btreeNullPredicate,
@@ -142,12 +141,16 @@ public class LSMRTree extends AbstractLSMRTree {
             }
             // Note. If we change the filter to write to metadata object, we 
don't need the if block above
             flushingComponent.getMetadata().copy(component.getMetadata());
-            abort = false;
             componentBulkLoader.end();
-        } finally {
-            if (abort && componentBulkLoader != null) {
-                componentBulkLoader.abort();
+        } catch (Throwable e) {
+            try {
+                if (componentBulkLoader != null) {
+                    componentBulkLoader.abort();
+                }
+            } catch (Throwable th) {
+                e.addSuppressed(th);
             }
+            throw e;
         }
         return component;
     }
@@ -264,32 +267,35 @@ public class LSMRTree extends AbstractLSMRTree {
         IIndexCursor cursor = mergeOp.getCursor();
         ILSMDiskComponentBulkLoader componentBulkLoader = null;
         ILSMDiskComponent mergedComponent = null;
-        boolean abort = true;
         try {
-            mergedComponent = createDiskComponent(componentFactory, 
mergeOp.getTarget(), mergeOp.getBTreeTarget(),
-                    mergeOp.getBloomFilterTarget(), true);
-            componentBulkLoader = loadMergeBulkLoader(mergeOp, cursor, 
mergedComponent);
-            if (mergedComponent.getLSMComponentFilter() != null) {
-                List<ITupleReference> filterTuples = new ArrayList<>();
-                for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
-                    
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
-                    
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+            try {
+                mergedComponent = createDiskComponent(componentFactory, 
mergeOp.getTarget(), mergeOp.getBTreeTarget(),
+                        mergeOp.getBloomFilterTarget(), true);
+                componentBulkLoader = loadMergeBulkLoader(mergeOp, cursor, 
mergedComponent);
+                if (mergedComponent.getLSMComponentFilter() != null) {
+                    List<ITupleReference> filterTuples = new ArrayList<>();
+                    for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
+                        
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
+                        
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+                    }
+                    
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), 
filterTuples,
+                            NoOpOperationCallback.INSTANCE);
+                    
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
+                            mergedComponent.getMetadataHolder());
                 }
-                
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), 
filterTuples,
-                        NoOpOperationCallback.INSTANCE);
-                
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
-                        mergedComponent.getMetadataHolder());
+                componentBulkLoader.end();
+            } finally {
+                cursor.destroy();
             }
-            abort = false;
-            componentBulkLoader.end();
-        } finally {
+        } catch (Throwable e) {
             try {
-                cursor.destroy();
-            } finally {
-                if (abort && componentBulkLoader != null) {
+                if (componentBulkLoader != null) {
                     componentBulkLoader.abort();
                 }
+            } catch (Throwable th) {
+                e.addSuppressed(th);
             }
+            throw e;
         }
         return mergedComponent;
     }
@@ -297,46 +303,38 @@ public class LSMRTree extends AbstractLSMRTree {
     private ILSMDiskComponentBulkLoader 
loadMergeBulkLoader(LSMRTreeMergeOperation mergeOp, IIndexCursor cursor,
             ILSMDiskComponent mergedComponent) throws HyracksDataException {
         ILSMDiskComponentBulkLoader componentBulkLoader = null;
-        boolean abort = true;
         ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
         ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) 
cursor).getOpCtx();
         search(opCtx, cursor, rtreeSearchPred);
         try {
-            try {
-                // In case we must keep the deleted-keys BTrees, then they 
must be merged
-                // *before* merging the r-trees so that
-                // lsmHarness.endSearch() is called once when the r-trees have 
been merged.
-                if 
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) 
!= diskComponents
-                        .get(diskComponents.size() - 1)) {
-                    // Keep the deleted tuples since the oldest disk component
-                    // is not included in the merge operation
-                    long numElements = 0L;
-                    for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
-                        numElements += ((LSMRTreeDiskComponent) 
mergeOp.getMergingComponents().get(i)).getBloomFilter()
-                                .getNumElements();
-                    }
-                    componentBulkLoader = 
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false,
-                            false, false, 
pageWriteCallbackFactory.createPageWriteCallback());
-                    mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred, 
componentBulkLoader);
-                } else {
-                    //no buddy-btree needed
-                    componentBulkLoader = 
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false,
-                            false, 
pageWriteCallbackFactory.createPageWriteCallback());
-                }
-                //search old rtree components
-                while (cursor.hasNext()) {
-                    cursor.next();
-                    ITupleReference frameTuple = cursor.getTuple();
-                    componentBulkLoader.add(frameTuple);
+            // In case we must keep the deleted-keys BTrees, then they must be 
merged
+            // *before* merging the r-trees so that
+            // lsmHarness.endSearch() is called once when the r-trees have 
been merged.
+            if 
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) 
!= diskComponents
+                    .get(diskComponents.size() - 1)) {
+                // Keep the deleted tuples since the oldest disk component
+                // is not included in the merge operation
+                long numElements = 0L;
+                for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
+                    numElements += ((LSMRTreeDiskComponent) 
mergeOp.getMergingComponents().get(i)).getBloomFilter()
+                            .getNumElements();
                 }
-            } finally {
-                cursor.close();
+                componentBulkLoader = 
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false, 
false,
+                        false, 
pageWriteCallbackFactory.createPageWriteCallback());
+                mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred, 
componentBulkLoader);
+            } else {
+                //no buddy-btree needed
+                componentBulkLoader = 
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false, false,
+                        pageWriteCallbackFactory.createPageWriteCallback());
             }
-            abort = false;
-        } finally {
-            if (abort && componentBulkLoader != null) {
-                componentBulkLoader.abort();
+            //search old rtree components
+            while (cursor.hasNext()) {
+                cursor.next();
+                ITupleReference frameTuple = cursor.getTuple();
+                componentBulkLoader.add(frameTuple);
             }
+        } finally {
+            cursor.close();
         }
         return componentBulkLoader;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 729ca74363..6968b3c3c3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -103,107 +103,111 @@ public class LSMRTreeWithAntiMatterTuples extends 
AbstractLSMRTree {
         TreeTupleSorter rTreeTupleSorter = null;
         TreeTupleSorter bTreeTupleSorter = null;
         boolean isEmpty = true;
-        boolean abort = true;
         try {
-            RTreeAccessor memRTreeAccessor =
-                    
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
             try {
-                RTreeSearchCursor rtreeScanCursor = 
memRTreeAccessor.createSearchCursor(false);
+                RTreeAccessor memRTreeAccessor =
+                        
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
                 try {
-                    memRTreeAccessor.search(rtreeScanCursor, 
rtreeNullPredicate);
-                    component = createDiskComponent(componentFactory, 
flushOp.getTarget(), null, null, true);
-                    componentBulkLoader = 
component.createBulkLoader(operation, 1.0f, false, 0L, false, false, false,
-                            
pageWriteCallbackFactory.createPageWriteCallback());
-                    // Since the LSM-RTree is used as a secondary assumption, 
the
-                    // primary key will be the last comparator in the BTree 
comparators
-                    rTreeTupleSorter = new 
TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray,
-                            rtreeLeafFrameFactory.createFrame(), 
rtreeLeafFrameFactory.createFrame(),
-                            flushingComponent.getIndex().getBufferCache(), 
comparatorFields);
+                    RTreeSearchCursor rtreeScanCursor = 
memRTreeAccessor.createSearchCursor(false);
                     try {
-                        isEmpty = scanAndSort(rtreeScanCursor, 
rTreeTupleSorter);
+                        memRTreeAccessor.search(rtreeScanCursor, 
rtreeNullPredicate);
+                        component = createDiskComponent(componentFactory, 
flushOp.getTarget(), null, null, true);
+                        componentBulkLoader = 
component.createBulkLoader(operation, 1.0f, false, 0L, false, false,
+                                false, 
pageWriteCallbackFactory.createPageWriteCallback());
+                        // Since the LSM-RTree is used as a secondary 
assumption, the
+                        // primary key will be the last comparator in the 
BTree comparators
+                        rTreeTupleSorter =
+                                new 
TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray,
+                                        rtreeLeafFrameFactory.createFrame(), 
rtreeLeafFrameFactory.createFrame(),
+                                        
flushingComponent.getIndex().getBufferCache(), comparatorFields);
+                        try {
+                            isEmpty = scanAndSort(rtreeScanCursor, 
rTreeTupleSorter);
+                        } finally {
+                            rtreeScanCursor.close();
+                        }
                     } finally {
-                        rtreeScanCursor.close();
+                        rtreeScanCursor.destroy();
                     }
                 } finally {
-                    rtreeScanCursor.destroy();
+                    memRTreeAccessor.destroy();
                 }
-            } finally {
-                memRTreeAccessor.destroy();
-            }
-            if (!isEmpty) {
-                rTreeTupleSorter.sort();
-            }
-            // scan the memory BTree
-            RangePredicate btreeNullPredicate = new RangePredicate(null, null, 
true, true, null, null);
-            BTreeAccessor memBTreeAccessor =
-                    
flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            try {
-                bTreeTupleSorter = new 
TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(), linearizerArray,
-                        btreeLeafFrameFactory.createFrame(), 
btreeLeafFrameFactory.createFrame(),
-                        flushingComponent.getBuddyIndex().getBufferCache(), 
comparatorFields);
-                BTreeRangeSearchCursor btreeScanCursor =
-                        (BTreeRangeSearchCursor) 
memBTreeAccessor.createSearchCursor(false);
+                if (!isEmpty) {
+                    rTreeTupleSorter.sort();
+                }
+                // scan the memory BTree
+                RangePredicate btreeNullPredicate = new RangePredicate(null, 
null, true, true, null, null);
+                BTreeAccessor memBTreeAccessor =
+                        
flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
                 try {
-                    isEmpty = true;
-                    memBTreeAccessor.search(btreeScanCursor, 
btreeNullPredicate);
+                    bTreeTupleSorter = new 
TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(),
+                            linearizerArray, 
btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
+                            
flushingComponent.getBuddyIndex().getBufferCache(), comparatorFields);
+                    BTreeRangeSearchCursor btreeScanCursor =
+                            (BTreeRangeSearchCursor) 
memBTreeAccessor.createSearchCursor(false);
                     try {
-                        isEmpty = scanAndSort(btreeScanCursor, 
bTreeTupleSorter);
+                        isEmpty = true;
+                        memBTreeAccessor.search(btreeScanCursor, 
btreeNullPredicate);
+                        try {
+                            isEmpty = scanAndSort(btreeScanCursor, 
bTreeTupleSorter);
+                        } finally {
+                            btreeScanCursor.close();
+                        }
                     } finally {
-                        btreeScanCursor.close();
+                        btreeScanCursor.destroy();
                     }
                 } finally {
-                    btreeScanCursor.destroy();
+                    memBTreeAccessor.destroy();
                 }
-            } finally {
-                memBTreeAccessor.destroy();
-            }
-            if (!isEmpty) {
-                bTreeTupleSorter.sort();
-            }
-            LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new 
LSMRTreeWithAntiMatterTuplesFlushCursor(
-                    rTreeTupleSorter, bTreeTupleSorter, comparatorFields, 
linearizerArray);
-            try {
-                cursor.open(null, null);
+                if (!isEmpty) {
+                    bTreeTupleSorter.sort();
+                }
+                LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new 
LSMRTreeWithAntiMatterTuplesFlushCursor(
+                        rTreeTupleSorter, bTreeTupleSorter, comparatorFields, 
linearizerArray);
                 try {
-                    while (cursor.hasNext()) {
-                        cursor.next();
-                        ITupleReference frameTuple = cursor.getTuple();
-                        componentBulkLoader.add(frameTuple);
+                    cursor.open(null, null);
+                    try {
+                        while (cursor.hasNext()) {
+                            cursor.next();
+                            ITupleReference frameTuple = cursor.getTuple();
+                            componentBulkLoader.add(frameTuple);
+                        }
+                    } finally {
+                        cursor.close();
                     }
                 } finally {
-                    cursor.close();
+                    cursor.destroy();
                 }
-            } finally {
-                cursor.destroy();
-            }
-            if (component.getLSMComponentFilter() != null) {
-                List<ITupleReference> filterTuples = new ArrayList<>();
-                
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-                
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-                
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
-                        NoOpOperationCallback.INSTANCE);
-                
getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
-            }
-            flushingComponent.getMetadata().copy(component.getMetadata());
-            abort = false;
-            componentBulkLoader.end();
-        } finally {
-            try {
-                if (rTreeTupleSorter != null) {
-                    rTreeTupleSorter.destroy();
+                if (component.getLSMComponentFilter() != null) {
+                    List<ITupleReference> filterTuples = new ArrayList<>();
+                    
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+                    
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+                    
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+                            NoOpOperationCallback.INSTANCE);
+                    
getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
                 }
+                flushingComponent.getMetadata().copy(component.getMetadata());
+                componentBulkLoader.end();
             } finally {
                 try {
-                    if (bTreeTupleSorter != null) {
-                        bTreeTupleSorter.destroy();
+                    if (rTreeTupleSorter != null) {
+                        rTreeTupleSorter.destroy();
                     }
                 } finally {
-                    if (abort && componentBulkLoader != null) {
-                        componentBulkLoader.abort();
+                    if (bTreeTupleSorter != null) {
+                        bTreeTupleSorter.destroy();
                     }
                 }
             }
+        } catch (Throwable e) {
+            try {
+                if (componentBulkLoader != null) {
+                    componentBulkLoader.abort();
+                }
+            } catch (Throwable th) {
+                e.addSuppressed(th);
+            }
+            throw e;
         }
         return component;
     }
@@ -243,26 +247,35 @@ public class LSMRTreeWithAntiMatterTuples extends 
AbstractLSMRTree {
         ILSMDiskComponentBulkLoader componentBulkLoader = 
component.createBulkLoader(operation, 1.0f, false, 0L, false,
                 false, false, 
pageWriteCallbackFactory.createPageWriteCallback());
         try {
-            while (cursor.hasNext()) {
-                cursor.next();
-                ITupleReference frameTuple = cursor.getTuple();
-                componentBulkLoader.add(frameTuple);
+            try {
+                while (cursor.hasNext()) {
+                    cursor.next();
+                    ITupleReference frameTuple = cursor.getTuple();
+                    componentBulkLoader.add(frameTuple);
+                }
+            } finally {
+                cursor.close();
             }
-        } finally {
-            cursor.close();
-        }
-        if (component.getLSMComponentFilter() != null) {
-            List<ITupleReference> filterTuples = new ArrayList<>();
-            for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
-                
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
-                
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+            if (component.getLSMComponentFilter() != null) {
+                List<ITupleReference> filterTuples = new ArrayList<>();
+                for (int i = 0; i < mergeOp.getMergingComponents().size(); 
++i) {
+                    
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
+                    
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+                }
+                
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+                        NoOpOperationCallback.INSTANCE);
+                
getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
             }
-            getFilterManager().updateFilter(component.getLSMComponentFilter(), 
filterTuples,
-                    NoOpOperationCallback.INSTANCE);
-            getFilterManager().writeFilter(component.getLSMComponentFilter(), 
component.getMetadataHolder());
-        }
 
-        componentBulkLoader.end();
+            componentBulkLoader.end();
+        } catch (Throwable e) {
+            try {
+                componentBulkLoader.abort();
+            } catch (Throwable th) {
+                e.addSuppressed(th);
+            }
+            throw e;
+        }
 
         return component;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 35785d3e48..33ba01ecdc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -976,17 +976,22 @@ public class RTree extends AbstractTreeIndex {
 
         @Override
         public void end() throws HyracksDataException {
-            pagesToWrite.clear();
-            //if writing a trivial 1-page tree, don't try and propagate up
-            if (nodeFrontiers.size() > 1) {
-                propagateBulk(1, true, pagesToWrite);
-            }
+            try {
+                pagesToWrite.clear();
+                //if writing a trivial 1-page tree, don't try and propagate up
+                if (nodeFrontiers.size() > 1) {
+                    propagateBulk(1, true, pagesToWrite);
+                }
 
-            for (ICachedPage c : pagesToWrite) {
-                write(c);
+                for (ICachedPage c : pagesToWrite) {
+                    write(c);
+                }
+                finish();
+                super.end();
+            } catch (HyracksDataException e) {
+                handleException();
+                throw e;
             }
-            finish();
-            super.end();
         }
 
         @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 4c2395fd12..a4ecb6f490 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -499,11 +499,13 @@ public class BufferCache implements IBufferCacheInternal, 
ILifeCycleComponent, I
         synchronized (cachedPages) {
             for (ICachedPageInternal internalPage : cachedPages) {
                 CachedPage c = (CachedPage) internalPage;
-                if (c.confiscated() || c.latch.getReadLockCount() != 0 || 
c.latch.getWriteHoldCount() != 0) {
-                    return false;
-                }
-                if (c.valid) {
-                    reachableDpids.add(c.dpid);
+                if (c != null) {
+                    if (c.confiscated() || c.latch.getReadLockCount() != 0 || 
c.latch.getWriteHoldCount() != 0) {
+                        return false;
+                    }
+                    if (c.valid) {
+                        reachableDpids.add(c.dpid);
+                    }
                 }
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 7d4e119bef..219275e4b9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -15,6 +15,8 @@
 
 package org.apache.hyracks.storage.common.buffercache;
 
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.LogManager;
@@ -38,18 +40,21 @@ public class FIFOLocalWriter implements IFIFOPageWriter {
         this.context = context;
     }
 
-    @SuppressWarnings("squid:S1181") // System must halt on all IO errors
+    @SuppressWarnings("squid:S1181")
     @Override
-    public void write(ICachedPage page) {
+    public void write(ICachedPage page) throws HyracksDataException {
         CachedPage cPage = (CachedPage) page;
         try {
             callback.beforeWrite(cPage);
             bufferCache.write(cPage, context);
             callback.afterWrite(cPage);
+        } catch (HyracksDataException e) {
+            LOGGER.warn("Failed to write page {}", cPage, e);
+            throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, 
e);
         } catch (Throwable th) {
             // Halt
-            LOGGER.error("Failed to write page {}", cPage, th);
-            ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+            LOGGER.error("FIFOLocalWriter has encountered a fatal error", th);
+            ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
         } finally {
             bufferCache.returnPage(cPage);
             if (DEBUG) {
@@ -57,5 +62,4 @@ public class FIFOLocalWriter implements IFIFOPageWriter {
             }
         }
     }
-
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
index 31e0e85b1f..2dc4a585f5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
@@ -15,7 +15,9 @@
 
 package org.apache.hyracks.storage.common.buffercache;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 @FunctionalInterface
 public interface IFIFOPageWriter {
-    void write(ICachedPage page);
+    void write(ICachedPage page) throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index b332a2de14..080b9ea16a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -21,22 +21,31 @@ package org.apache.hyracks.util;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 public class ExponentialRetryPolicy implements IRetryPolicy {
 
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public static final String CLOUD_UNSTABLE_MODE = "cloud.unstable.mode";
     private static final int DEFAULT_MAX_RETRIES = 10;
     private static final long DEFAULT_INITIAL_DELAY_IN_MILLIS = 100;
     private static final long DEFAULT_MAX_DELAY_IN_MILLIS = Long.MAX_VALUE - 1;
+    private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
     private final int maxRetries;
     private final long maxDelay;
     private int attempt = 0;
     private long delay;
+    private boolean printDebugLines = true;
 
     /**
      * Default constructor for ExponentialRetryPolicy.
      * Initializes with default max retries, initial delay, and max delay.
      */
     public ExponentialRetryPolicy() {
-        this(DEFAULT_MAX_RETRIES, DEFAULT_INITIAL_DELAY_IN_MILLIS, 
DEFAULT_MAX_DELAY_IN_MILLIS);
+        this(isUnstable() ? UNSTABLE_NUMBER_OF_RETRIES : DEFAULT_MAX_RETRIES, 
DEFAULT_INITIAL_DELAY_IN_MILLIS,
+                isUnstable() ? 0 : DEFAULT_MAX_DELAY_IN_MILLIS);
     }
 
     /**
@@ -71,13 +80,18 @@ public class ExponentialRetryPolicy implements IRetryPolicy 
{
      */
     public ExponentialRetryPolicy(int maxRetries, long maxDelay) {
         this(maxRetries, DEFAULT_INITIAL_DELAY_IN_MILLIS, maxDelay);
+        printDebugLines = false;
     }
 
     @Override
     public boolean retry(Throwable failure) {
         if (attempt < maxRetries) {
             try {
-                
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1 + delay));
+                long sleepTime = ThreadLocalRandom.current().nextLong(1 + 
delay);
+                if (printDebugLines) {
+                    LOGGER.info("Retrying after {}ms, attempt: {}/{}", 
sleepTime, attempt + 1, maxRetries);
+                }
+                TimeUnit.MILLISECONDS.sleep(sleepTime);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
@@ -87,4 +101,8 @@ public class ExponentialRetryPolicy implements IRetryPolicy {
         }
         return false;
     }
+
+    private static boolean isUnstable() {
+        return Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
+    }
 }


Reply via email to