This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit c2cb7533ab9e8ec9170090f1222d6db3d39a0497 Author: Ali Alsuliman <[email protected]> AuthorDate: Fri Apr 4 19:38:14 2025 -0700 [ASTERIXDB-3591][RT] Ensure close/fail of pipeline start uninterrupted - user model changes: no - storage format changes: no - interface changes: no Details: - propagate interrupted exception while waiting in retry() of ExponentialRetryPolicy. Ext-ref: MB-66048 Change-Id: I49f859e1b8b72f7ae5e7bdbbb759389c6789fa0b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19623 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- asterixdb/asterix-app/pom.xml | 3 +- ...Test.java => CloudStorageCancellationTest.java} | 51 ++++------------- .../test/cloud_storage/CloudStorageTest.java | 24 ++++---- .../test/common/CancellationTestExecutor.java | 8 +++ .../algebricks/algebricks-runtime/pom.xml | 5 ++ .../meta/AlgebricksMetaOperatorDescriptor.java | 21 +++---- .../org/apache/hyracks/api/util/InvokeUtil.java | 64 ++++++++++++++++++++++ .../cloud/util/CloudRetryableRequestUtil.java | 8 ++- .../java/org/apache/hyracks/control/nc/Task.java | 35 ++++++------ .../am/lsm/common/impls/LSMTreeIndexAccessor.java | 17 ++++-- .../impls/LSMInvertedIndexAccessor.java | 17 ++++-- .../hyracks/util/ExponentialRetryPolicy.java | 14 ++--- .../java/org/apache/hyracks/util/IRetryPolicy.java | 3 +- 13 files changed, 165 insertions(+), 105 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index c801283ab1..f74a99512b 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -490,7 +490,7 @@ <id>asterix-gerrit-asterix-app</id> <properties> <test.excludes> - **/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java, + **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java, **/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java, **/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java, **/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java, @@ -611,6 +611,7 @@ <properties> <test.includes> **/CloudStorageTest.java, + **/CloudStorageCancellationTest.java, **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java </test.includes> <failIfNoTests>false</failIfNoTests> diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java similarity index 53% copy from asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java copy to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java index 498f060d22..2e1f94ff96 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java @@ -18,19 +18,21 @@ */ package org.apache.asterix.test.cloud_storage; -import java.net.URI; +import static org.apache.asterix.test.cloud_storage.CloudStorageTest.DELTA_RESULT_PATH; +import static org.apache.asterix.test.cloud_storage.CloudStorageTest.EXCLUDED_TESTS; +import static org.apache.asterix.test.cloud_storage.CloudStorageTest.ONLY_TESTS; +import static org.apache.asterix.test.cloud_storage.CloudStorageTest.SUITE_TESTS; + import java.util.Collection; import java.util.List; import org.apache.asterix.api.common.LocalCloudUtilAdobeMock; -import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.test.common.CancellationTestExecutor; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.test.runtime.LangExecutionUtil; import org.apache.asterix.testframework.context.TestCaseContext; import org.apache.asterix.testframework.xml.Description; import org.apache.asterix.testframework.xml.TestCase; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.junit.AfterClass; import org.junit.Assume; import org.junit.BeforeClass; @@ -41,54 +43,23 @@ import org.junit.runners.MethodSorters; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; - /** * Run tests in cloud deployment environment */ @RunWith(Parameterized.class) @FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class CloudStorageTest { - - private static final Logger LOGGER = LogManager.getLogger(); +public class CloudStorageCancellationTest { private final TestCaseContext tcCtx; - private static final String SUITE_TESTS = "testsuite_cloud_storage.xml"; - private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml"; - private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf"; - private static final String DELTA_RESULT_PATH = "results_cloud"; - private static final String EXCLUDED_TESTS = "MP"; - private static final String PLAYGROUND_CONTAINER = "playground"; - private static final String MOCK_SERVER_REGION = "us-west-2"; - private static final int MOCK_SERVER_PORT = 8001; - private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT; - - public CloudStorageTest(TestCaseContext tcCtx) { + public CloudStorageCancellationTest(TestCaseContext tcCtx) { this.tcCtx = tcCtx; } @BeforeClass public static void setUp() throws Exception { - LocalCloudUtilAdobeMock.startS3CloudEnvironment(true); - TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH); - testExecutor.executorId = "cloud"; - testExecutor.stripSubstring = "//DB:"; - LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor); - System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME); - - // create the playground bucket and leave it empty, just for external collection-based tests - S3ClientBuilder builder = S3Client.builder(); - URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server - builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create()) - .endpointOverride(endpoint); - S3Client client = builder.build(); - client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build()); - client.close(); + TestExecutor testExecutor = new CancellationTestExecutor(DELTA_RESULT_PATH); + CloudStorageTest.setupEnv(testExecutor); } @AfterClass @@ -97,7 +68,7 @@ public class CloudStorageTest { LocalCloudUtilAdobeMock.shutdownSilently(); } - @Parameters(name = "CloudStorageTest {index}: {0}") + @Parameters(name = "CloudStorageCancellationTest {index}: {0}") public static Collection<Object[]> tests() throws Exception { return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java index 498f060d22..340583810e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java @@ -57,16 +57,16 @@ public class CloudStorageTest { private static final Logger LOGGER = LogManager.getLogger(); private final TestCaseContext tcCtx; - private static final String SUITE_TESTS = "testsuite_cloud_storage.xml"; - private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml"; - private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf"; - private static final String DELTA_RESULT_PATH = "results_cloud"; - private static final String EXCLUDED_TESTS = "MP"; + public static final String SUITE_TESTS = "testsuite_cloud_storage.xml"; + public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml"; + public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf"; + public static final String DELTA_RESULT_PATH = "results_cloud"; + public static final String EXCLUDED_TESTS = "MP"; - private static final String PLAYGROUND_CONTAINER = "playground"; - private static final String MOCK_SERVER_REGION = "us-west-2"; - private static final int MOCK_SERVER_PORT = 8001; - private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT; + public static final String PLAYGROUND_CONTAINER = "playground"; + public static final String MOCK_SERVER_REGION = "us-west-2"; + public static final int MOCK_SERVER_PORT = 8001; + public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT; public CloudStorageTest(TestCaseContext tcCtx) { this.tcCtx = tcCtx; @@ -74,8 +74,12 @@ public class CloudStorageTest { @BeforeClass public static void setUp() throws Exception { - LocalCloudUtilAdobeMock.startS3CloudEnvironment(true); TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH); + setupEnv(testExecutor); + } + + public static void setupEnv(TestExecutor testExecutor) throws Exception { + LocalCloudUtilAdobeMock.startS3CloudEnvironment(true); testExecutor.executorId = "cloud"; testExecutor.stripSubstring = "//DB:"; LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index b4b73528e8..dac71ce271 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -49,6 +49,14 @@ public class CancellationTestExecutor extends TestExecutor { private final ExecutorService executor = Executors.newSingleThreadExecutor(); + public CancellationTestExecutor() { + super(); + } + + public CancellationTestExecutor(String deltaPath) { + super(deltaPath); + } + @Override public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri, List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, Charset responseCharset, diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml index 7a272cc546..8f95e34a0a 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml +++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml @@ -47,6 +47,11 @@ <artifactId>hyracks-dataflow-std</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>algebricks-common</artifactId> diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index f6d9cd15b1..560f8173b3 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -38,6 +38,7 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.NoOpOperatorStats; import org.apache.hyracks.api.job.profiling.OperatorStats; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; @@ -170,25 +171,21 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor); startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>()); - HyracksDataException exception = null; + Exception exception = null; try { startOfPipeline.open(); } catch (Exception e) { - startOfPipeline.fail(); - exception = HyracksDataException.create(e); + exception = e; } finally { - try { - startOfPipeline.close(); - } catch (Exception e) { - if (exception == null) { - exception = HyracksDataException.create(e); - } else { - exception.addSuppressed(e); - } + if (exception != null) { + exception = InvokeUtil.tryUninterruptibleWithCleanups(exception, startOfPipeline::fail, + startOfPipeline::close); + } else { + exception = InvokeUtil.runUninterruptible(exception, startOfPipeline::close); } } if (exception != null) { - throw exception; + throw HyracksDataException.create(exception); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index ff0a3c542b..932642720c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -285,6 +285,57 @@ public class InvokeUtil { } } + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" }) + // catching Throwable, instanceofs, false-positive unreachable code + public static Exception tryUninterruptibleWithCleanups(Exception root, ThrowingAction action, + ThrowingAction... cleanups) { + try { + tryUninterruptibleWithCleanups(action, cleanups); + } catch (Exception e) { + root = ExceptionUtils.suppress(root, e); + } + return root; + } + + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" }) + // catching Throwable, instanceofs, false-positive unreachable code + public static void tryUninterruptibleWithCleanups(ThrowingAction action, ThrowingAction... cleanups) + throws Exception { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + runUninterruptible(action); + } catch (Throwable t) { + savedT = t; + } finally { + for (ThrowingAction cleanup : cleanups) { + try { + runUninterruptible(cleanup); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else if (savedT instanceof Exception) { + throw (Exception) savedT; + } else { + throw HyracksDataException.create(savedT); + } + } + // catching Throwable, instanceofs, false-positive unreachable code public static void tryWithCleanups(ThrowingAction action, ThrowingConsumer<Throwable>... cleanups) throws Exception { @@ -422,6 +473,19 @@ public class InvokeUtil { } } + /** + * Runs the supplied action, after suspending any pending interruption. An error will be logged if + * the action is itself interrupted. + */ + public static Exception runUninterruptible(Exception root, ThrowingAction action) { + try { + runUninterruptible(action); + } catch (Exception e) { + root = ExceptionUtils.suppress(root, e); + } + return root; + } + /** * Runs the supplied action, after suspending any pending interruption. An error will be logged if * the action is itself interrupted. diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java index c36e4a7cba..da9c8fb566 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java @@ -166,8 +166,12 @@ public class CloudRetryableRequestUtil { LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e); Thread.currentThread().interrupt(); } - if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) { - throw HyracksDataException.create(e); + try { + if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) { + throw HyracksDataException.create(e); + } + } catch (InterruptedException interruptedEx) { + throw HyracksDataException.create(interruptedEx); } attempt++; retry.beforeRetry(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 0bf74ee1cb..614a22625d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -63,8 +63,8 @@ import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.api.resources.IDeallocatable; import org.apache.hyracks.api.result.IResultPartitionManager; -import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.profiling.StatsCollector; @@ -420,30 +420,29 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { IFrameReader reader = collector.getReader(); reader.open(); try { - try { - writer.open(); - VSizeFrame frame = new VSizeFrame(this); - while (reader.nextFrame(frame)) { - if (aborted) { - return; - } - ByteBuffer buffer = frame.getBuffer(); - writer.nextFrame(buffer); - buffer.compact(); + writer.open(); + VSizeFrame frame = new VSizeFrame(this); + while (reader.nextFrame(frame)) { + if (aborted) { + return; } - } catch (Exception e) { - originalEx = e; - CleanupUtils.fail(writer, originalEx); - } finally { - originalEx = CleanupUtils.closeSilently(writer, originalEx); + ByteBuffer buffer = frame.getBuffer(); + writer.nextFrame(buffer); + buffer.compact(); } + } catch (Exception e) { + originalEx = e; } finally { - originalEx = CleanupUtils.closeSilently(reader, originalEx); + if (originalEx != null) { + InvokeUtil.tryUninterruptibleWithCleanups(writer::fail, writer::close, reader::close); + } else { + InvokeUtil.tryUninterruptibleWithCleanups(writer::close, reader::close); + } } } catch (Exception e) { originalEx = ExceptionUtils.suppress(originalEx, e); } finally { - originalEx = CleanupUtils.closeSilently(collector, originalEx); + InvokeUtil.runUninterruptible(collector::close); } } catch (Exception e) { originalEx = ExceptionUtils.suppress(originalEx, e); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 52bd07ed8b..7ec4f8638e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -127,12 +127,17 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { IRetryPolicy policy = new ExponentialRetryPolicy(); while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { - if (policy.retry(operation.getFailure())) { - operation.setFailure(null); - operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS); - lsmHarness.flush(operation); - } else { - break; + try { + if (policy.retry(operation.getFailure())) { + operation.setFailure(null); + operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS); + lsmHarness.flush(operation); + } else { + break; + } + } catch (InterruptedException e) { + // in reality, this thread won't be interrupted + throw HyracksDataException.create(e); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index d8900ad2af..ee63547d79 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -98,12 +98,17 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { IRetryPolicy policy = new ExponentialRetryPolicy(); while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { - if (policy.retry(operation.getFailure())) { - operation.setFailure(null); - operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS); - lsmHarness.flush(operation); - } else { - break; + try { + if (policy.retry(operation.getFailure())) { + operation.setFailure(null); + operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS); + lsmHarness.flush(operation); + } else { + break; + } + } catch (InterruptedException e) { + // in reality, this thread won't be interrupted + throw HyracksDataException.create(e); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java index 080b9ea16a..72a3bb07e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java @@ -84,17 +84,13 @@ public class ExponentialRetryPolicy implements IRetryPolicy { } @Override - public boolean retry(Throwable failure) { + public boolean retry(Throwable failure) throws InterruptedException { if (attempt < maxRetries) { - try { - long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay); - if (printDebugLines) { - LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries); - } - TimeUnit.MILLISECONDS.sleep(sleepTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay); + if (printDebugLines) { + LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries); } + TimeUnit.MILLISECONDS.sleep(sleepTime); attempt++; delay = delay > maxDelay / 2 ? maxDelay : delay * 2; return true; diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java index 0d18a2bf75..2fd019136b 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java @@ -24,6 +24,7 @@ public interface IRetryPolicy { * @param failure * the cause of the failure (this cannot be null) * @return true if one more attempt should be done + * @throws InterruptedException if the retry policy can be interrupted */ - boolean retry(Throwable failure); + boolean retry(Throwable failure) throws InterruptedException; }
