This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit c2cb7533ab9e8ec9170090f1222d6db3d39a0497
Author: Ali Alsuliman <[email protected]>
AuthorDate: Fri Apr 4 19:38:14 2025 -0700

    [ASTERIXDB-3591][RT] Ensure close/fail of pipeline start uninterrupted
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - propagate interrupted exception while waiting in retry() of
    ExponentialRetryPolicy.
    
    Ext-ref: MB-66048
    
    Change-Id: I49f859e1b8b72f7ae5e7bdbbb759389c6789fa0b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19623
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
---
 asterixdb/asterix-app/pom.xml                      |  3 +-
 ...Test.java => CloudStorageCancellationTest.java} | 51 ++++-------------
 .../test/cloud_storage/CloudStorageTest.java       | 24 ++++----
 .../test/common/CancellationTestExecutor.java      |  8 +++
 .../algebricks/algebricks-runtime/pom.xml          |  5 ++
 .../meta/AlgebricksMetaOperatorDescriptor.java     | 21 +++----
 .../org/apache/hyracks/api/util/InvokeUtil.java    | 64 ++++++++++++++++++++++
 .../cloud/util/CloudRetryableRequestUtil.java      |  8 ++-
 .../java/org/apache/hyracks/control/nc/Task.java   | 35 ++++++------
 .../am/lsm/common/impls/LSMTreeIndexAccessor.java  | 17 ++++--
 .../impls/LSMInvertedIndexAccessor.java            | 17 ++++--
 .../hyracks/util/ExponentialRetryPolicy.java       | 14 ++---
 .../java/org/apache/hyracks/util/IRetryPolicy.java |  3 +-
 13 files changed, 165 insertions(+), 105 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index c801283ab1..f74a99512b 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -490,7 +490,7 @@
       <id>asterix-gerrit-asterix-app</id>
       <properties>
         <test.excludes>
-          
**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+          
**/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
           
**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           
**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           
**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -611,6 +611,7 @@
       <properties>
         <test.includes>
           **/CloudStorageTest.java,
+          **/CloudStorageCancellationTest.java,
           **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
         </test.includes>
         <failIfNoTests>false</failIfNoTests>
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
similarity index 53%
copy from 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
copy to 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
index 498f060d22..2e1f94ff96 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
@@ -18,19 +18,21 @@
  */
 package org.apache.asterix.test.cloud_storage;
 
-import java.net.URI;
+import static 
org.apache.asterix.test.cloud_storage.CloudStorageTest.DELTA_RESULT_PATH;
+import static 
org.apache.asterix.test.cloud_storage.CloudStorageTest.EXCLUDED_TESTS;
+import static 
org.apache.asterix.test.cloud_storage.CloudStorageTest.ONLY_TESTS;
+import static 
org.apache.asterix.test.cloud_storage.CloudStorageTest.SUITE_TESTS;
+
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
-import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.CancellationTestExecutor;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.runtime.LangExecutionUtil;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.asterix.testframework.xml.Description;
 import org.apache.asterix.testframework.xml.TestCase;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -41,54 +43,23 @@ import org.junit.runners.MethodSorters;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-
 /**
  * Run tests in cloud deployment environment
  */
 @RunWith(Parameterized.class)
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class CloudStorageTest {
-
-    private static final Logger LOGGER = LogManager.getLogger();
+public class CloudStorageCancellationTest {
 
     private final TestCaseContext tcCtx;
-    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
-    private static final String ONLY_TESTS = 
"testsuite_cloud_storage_only.xml";
-    private static final String CONFIG_FILE_NAME = 
"src/test/resources/cc-cloud-storage.conf";
-    private static final String DELTA_RESULT_PATH = "results_cloud";
-    private static final String EXCLUDED_TESTS = "MP";
 
-    private static final String PLAYGROUND_CONTAINER = "playground";
-    private static final String MOCK_SERVER_REGION = "us-west-2";
-    private static final int MOCK_SERVER_PORT = 8001;
-    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:"; + 
MOCK_SERVER_PORT;
-
-    public CloudStorageTest(TestCaseContext tcCtx) {
+    public CloudStorageCancellationTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
     }
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
-        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
-        testExecutor.executorId = "cloud";
-        testExecutor.stripSubstring = "//DB:";
-        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
-        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, 
CONFIG_FILE_NAME);
-
-        // create the playground bucket and leave it empty, just for external 
collection-based tests
-        S3ClientBuilder builder = S3Client.builder();
-        URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing 
to S3 mock server
-        
builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
-                .endpointOverride(endpoint);
-        S3Client client = builder.build();
-        
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
-        client.close();
+        TestExecutor testExecutor = new 
CancellationTestExecutor(DELTA_RESULT_PATH);
+        CloudStorageTest.setupEnv(testExecutor);
     }
 
     @AfterClass
@@ -97,7 +68,7 @@ public class CloudStorageTest {
         LocalCloudUtilAdobeMock.shutdownSilently();
     }
 
-    @Parameters(name = "CloudStorageTest {index}: {0}")
+    @Parameters(name = "CloudStorageCancellationTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
         return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
     }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index 498f060d22..340583810e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -57,16 +57,16 @@ public class CloudStorageTest {
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final TestCaseContext tcCtx;
-    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
-    private static final String ONLY_TESTS = 
"testsuite_cloud_storage_only.xml";
-    private static final String CONFIG_FILE_NAME = 
"src/test/resources/cc-cloud-storage.conf";
-    private static final String DELTA_RESULT_PATH = "results_cloud";
-    private static final String EXCLUDED_TESTS = "MP";
+    public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    public static final String CONFIG_FILE_NAME = 
"src/test/resources/cc-cloud-storage.conf";
+    public static final String DELTA_RESULT_PATH = "results_cloud";
+    public static final String EXCLUDED_TESTS = "MP";
 
-    private static final String PLAYGROUND_CONTAINER = "playground";
-    private static final String MOCK_SERVER_REGION = "us-west-2";
-    private static final int MOCK_SERVER_PORT = 8001;
-    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:"; + 
MOCK_SERVER_PORT;
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    public static final String MOCK_SERVER_REGION = "us-west-2";
+    public static final int MOCK_SERVER_PORT = 8001;
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:"; + 
MOCK_SERVER_PORT;
 
     public CloudStorageTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
@@ -74,8 +74,12 @@ public class CloudStorageTest {
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
         TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        setupEnv(testExecutor);
+    }
+
+    public static void setupEnv(TestExecutor testExecutor) throws Exception {
+        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
         testExecutor.executorId = "cloud";
         testExecutor.stripSubstring = "//DB:";
         LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index b4b73528e8..dac71ce271 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -49,6 +49,14 @@ public class CancellationTestExecutor extends TestExecutor {
 
     private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
 
+    public CancellationTestExecutor() {
+        super();
+    }
+
+    public CancellationTestExecutor(String deltaPath) {
+        super(deltaPath);
+    }
+
     @Override
     public InputStream executeQueryService(String str, 
TestCaseContext.OutputFormat fmt, URI uri,
             List<TestCase.CompilationUnit.Parameter> params, boolean 
jsonEncoded, Charset responseCharset,
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml 
b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index 7a272cc546..8f95e34a0a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -47,6 +47,11 @@
       <artifactId>hyracks-dataflow-std</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>algebricks-common</artifactId>
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index f6d9cd15b1..560f8173b3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.util.InvokeUtil;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -170,25 +171,21 @@ public class AlgebricksMetaOperatorDescriptor extends 
AbstractSingleActivityOper
             PipelineAssembler pa =
                     new PipelineAssembler(pipeline, inputArity, outputArity, 
null, pipelineOutputRecordDescriptor);
             startOfPipeline = pa.assemblePipeline(writer, ctx, new 
HashMap<>());
-            HyracksDataException exception = null;
+            Exception exception = null;
             try {
                 startOfPipeline.open();
             } catch (Exception e) {
-                startOfPipeline.fail();
-                exception = HyracksDataException.create(e);
+                exception = e;
             } finally {
-                try {
-                    startOfPipeline.close();
-                } catch (Exception e) {
-                    if (exception == null) {
-                        exception = HyracksDataException.create(e);
-                    } else {
-                        exception.addSuppressed(e);
-                    }
+                if (exception != null) {
+                    exception = 
InvokeUtil.tryUninterruptibleWithCleanups(exception, startOfPipeline::fail,
+                            startOfPipeline::close);
+                } else {
+                    exception = InvokeUtil.runUninterruptible(exception, 
startOfPipeline::close);
                 }
             }
             if (exception != null) {
-                throw exception;
+                throw HyracksDataException.create(exception);
             }
         }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ff0a3c542b..932642720c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -285,6 +285,57 @@ public class InvokeUtil {
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", 
"UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
+    public static Exception tryUninterruptibleWithCleanups(Exception root, 
ThrowingAction action,
+            ThrowingAction... cleanups) {
+        try {
+            tryUninterruptibleWithCleanups(action, cleanups);
+        } catch (Exception e) {
+            root = ExceptionUtils.suppress(root, e);
+        }
+        return root;
+    }
+
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", 
"UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
+    public static void tryUninterruptibleWithCleanups(ThrowingAction action, 
ThrowingAction... cleanups)
+            throws Exception {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            runUninterruptible(action);
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (ThrowingAction cleanup : cleanups) {
+                try {
+                    runUninterruptible(cleanup);
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t 
instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT instanceof Exception) {
+            throw (Exception) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     // catching Throwable, instanceofs, false-positive unreachable code
     public static void tryWithCleanups(ThrowingAction action, 
ThrowingConsumer<Throwable>... cleanups)
             throws Exception {
@@ -422,6 +473,19 @@ public class InvokeUtil {
         }
     }
 
+    /**
+     * Runs the supplied action, after suspending any pending interruption. An 
error will be logged if
+     * the action is itself interrupted.
+     */
+    public static Exception runUninterruptible(Exception root, ThrowingAction 
action) {
+        try {
+            runUninterruptible(action);
+        } catch (Exception e) {
+            root = ExceptionUtils.suppress(root, e);
+        }
+        return root;
+    }
+
     /**
      * Runs the supplied action, after suspending any pending interruption. An 
error will be logged if
      * the action is itself interrupted.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index c36e4a7cba..da9c8fb566 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -166,8 +166,12 @@ public class CloudRetryableRequestUtil {
                     LOGGER.warn("Lost suppressed interrupt during 
ICloudReturnableRequest", e);
                     Thread.currentThread().interrupt();
                 }
-                if (Thread.currentThread().isInterrupted() || 
!retryPolicy.retry(e)) {
-                    throw HyracksDataException.create(e);
+                try {
+                    if (Thread.currentThread().isInterrupted() || 
!retryPolicy.retry(e)) {
+                        throw HyracksDataException.create(e);
+                    }
+                } catch (InterruptedException interruptedEx) {
+                    throw HyracksDataException.create(interruptedEx);
                 }
                 attempt++;
                 retry.beforeRetry();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 0bf74ee1cb..614a22625d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -63,8 +63,8 @@ import 
org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -420,30 +420,29 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
-                    try {
-                        writer.open();
-                        VSizeFrame frame = new VSizeFrame(this);
-                        while (reader.nextFrame(frame)) {
-                            if (aborted) {
-                                return;
-                            }
-                            ByteBuffer buffer = frame.getBuffer();
-                            writer.nextFrame(buffer);
-                            buffer.compact();
+                    writer.open();
+                    VSizeFrame frame = new VSizeFrame(this);
+                    while (reader.nextFrame(frame)) {
+                        if (aborted) {
+                            return;
                         }
-                    } catch (Exception e) {
-                        originalEx = e;
-                        CleanupUtils.fail(writer, originalEx);
-                    } finally {
-                        originalEx = CleanupUtils.closeSilently(writer, 
originalEx);
+                        ByteBuffer buffer = frame.getBuffer();
+                        writer.nextFrame(buffer);
+                        buffer.compact();
                     }
+                } catch (Exception e) {
+                    originalEx = e;
                 } finally {
-                    originalEx = CleanupUtils.closeSilently(reader, 
originalEx);
+                    if (originalEx != null) {
+                        
InvokeUtil.tryUninterruptibleWithCleanups(writer::fail, writer::close, 
reader::close);
+                    } else {
+                        
InvokeUtil.tryUninterruptibleWithCleanups(writer::close, reader::close);
+                    }
                 }
             } catch (Exception e) {
                 originalEx = ExceptionUtils.suppress(originalEx, e);
             } finally {
-                originalEx = CleanupUtils.closeSilently(collector, originalEx);
+                InvokeUtil.runUninterruptible(collector::close);
             }
         } catch (Exception e) {
             originalEx = ExceptionUtils.suppress(originalEx, e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 52bd07ed8b..7ec4f8638e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -127,12 +127,17 @@ public class LSMTreeIndexAccessor implements 
ILSMIndexAccessor {
         if (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
             IRetryPolicy policy = new ExponentialRetryPolicy();
             while (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
-                if (policy.retry(operation.getFailure())) {
-                    operation.setFailure(null);
-                    
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
-                    lsmHarness.flush(operation);
-                } else {
-                    break;
+                try {
+                    if (policy.retry(operation.getFailure())) {
+                        operation.setFailure(null);
+                        
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                        lsmHarness.flush(operation);
+                    } else {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    // in reality, this thread won't be interrupted
+                    throw HyracksDataException.create(e);
                 }
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index d8900ad2af..ee63547d79 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -98,12 +98,17 @@ public class LSMInvertedIndexAccessor implements 
ILSMIndexAccessor, IInvertedInd
         if (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
             IRetryPolicy policy = new ExponentialRetryPolicy();
             while (operation.getStatus() == 
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
-                if (policy.retry(operation.getFailure())) {
-                    operation.setFailure(null);
-                    
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
-                    lsmHarness.flush(operation);
-                } else {
-                    break;
+                try {
+                    if (policy.retry(operation.getFailure())) {
+                        operation.setFailure(null);
+                        
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                        lsmHarness.flush(operation);
+                    } else {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    // in reality, this thread won't be interrupted
+                    throw HyracksDataException.create(e);
                 }
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 080b9ea16a..72a3bb07e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -84,17 +84,13 @@ public class ExponentialRetryPolicy implements IRetryPolicy 
{
     }
 
     @Override
-    public boolean retry(Throwable failure) {
+    public boolean retry(Throwable failure) throws InterruptedException {
         if (attempt < maxRetries) {
-            try {
-                long sleepTime = ThreadLocalRandom.current().nextLong(1 + 
delay);
-                if (printDebugLines) {
-                    LOGGER.info("Retrying after {}ms, attempt: {}/{}", 
sleepTime, attempt + 1, maxRetries);
-                }
-                TimeUnit.MILLISECONDS.sleep(sleepTime);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+            long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
+            if (printDebugLines) {
+                LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, 
attempt + 1, maxRetries);
             }
+            TimeUnit.MILLISECONDS.sleep(sleepTime);
             attempt++;
             delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
             return true;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 0d18a2bf75..2fd019136b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -24,6 +24,7 @@ public interface IRetryPolicy {
      * @param failure
      *            the cause of the failure (this cannot be null)
      * @return true if one more attempt should be done
+     * @throws InterruptedException if the retry policy can be interrupted
      */
-    boolean retry(Throwable failure);
+    boolean retry(Throwable failure) throws InterruptedException;
 }

Reply via email to