This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6e83abc61cfb5830bd9856a16efe9d94e8ed3526 Author: Savyasach Reddy <[email protected]> AuthorDate: Thu Jan 16 17:38:06 2025 +0530 [ASTERIXDB-3564][STO]: Avoid halts on IO operation failures - user model changes: no - storage format changes: no - interface changes: yes details: - Retry Flush Operations on IOException - Avoid Halts on Merge failure - Make Create Index operation retryable Ext-ref: MB-63040 Change-Id: If253ea03f5baecbab226e527abb4267670a4233e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19187 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ritik Raj <[email protected]> Tested-by: Ritik Raj <[email protected]> Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19534 Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- asterixdb/asterix-app/pom.xml | 4 +- .../org/apache/asterix/app/nc/HaltCallback.java | 11 +- .../asterix/app/translator/QueryTranslator.java | 6 +- .../test/cloud_storage/CloudStorageGCSTest.java | 2 + ...STest.java => GCSCloudStorageUnstableTest.java} | 52 +++--- .../cloud_storage/RetryingQueryTranslator.java | 157 ++++++++++++++++ .../UnstableStatementExecutorExtension.java | 55 ++++++ .../UnstableStatementExecutorFactory.java | 46 +++++ .../src/test/resources/cc-cloud-storage-gcs.conf | 2 + .../asterix/cloud/CloudResettableInputStream.java | 7 +- .../asterix/cloud/clients/UnstableCloudClient.java | 70 +++++++- .../cloud/clients/aws/s3/S3BufferedWriter.java | 3 +- .../cloud/clients/google/gcs/GCSWriter.java | 10 +- .../column/metadata/schema/ObjectSchemaNode.java | 5 +- .../operation/lsm/flush/FlushColumnMetadata.java | 7 +- .../ioopcallbacks/LSMIOOperationCallback.java | 11 ++ .../LSMSecondaryIndexBulkLoadNodePushable.java | 19 +- .../apache/hyracks/api/exceptions/ErrorCode.java | 1 + .../src/main/resources/errormsg/en.properties | 1 + .../storage/am/bloomfilter/impls/BloomFilter.java | 24 ++- .../storage/am/common/api/IPageManager.java | 6 + .../IndexBulkLoadOperatorNodePushable.java | 22 ++- .../AppendOnlyLinkedMetadataPageManager.java | 7 + .../common/impls/AbstractTreeIndexBulkLoader.java | 1 + .../storage/am/lsm/btree/impls/LSMBTree.java | 122 +++++++------ .../AbstractLSMWithBloomFilterDiskComponent.java | 9 +- .../api/AbstractLSMWithBuddyDiskComponent.java | 15 +- .../am/lsm/common/api/ILSMDiskComponent.java | 5 + .../am/lsm/common/api/ILSMIOOperationCallback.java | 6 + .../lsm/common/impls/AbstractLSMDiskComponent.java | 14 +- .../am/lsm/common/impls/AbstractLSMIndex.java | 7 +- .../impls/ChainedLSMDiskComponentBulkLoader.java | 17 +- .../am/lsm/common/impls/EmptyComponent.java | 5 + .../common/impls/LSMComponentFileReferences.java | 2 +- .../storage/am/lsm/common/impls/LSMHarness.java | 3 +- .../impls/LSMIndexDiskComponentBulkLoader.java | 6 +- .../impls/LSMInvertedComponentFileReferences.java | 43 +++++ .../am/lsm/common/impls/LSMTreeIndexAccessor.java | 14 ++ .../storage/am/lsm/common/util/ComponentUtils.java | 16 ++ .../lsm/invertedindex/impls/LSMInvertedIndex.java | 188 ++++++++++++-------- .../impls/LSMInvertedIndexAccessor.java | 14 ++ .../impls/LSMInvertedIndexDiskComponent.java | 27 +-- .../impls/LSMInvertedIndexFileManager.java | 11 +- .../impls/LSMInvertedIndexFlushOperation.java | 8 +- .../impls/LSMInvertedIndexMergeOperation.java | 8 +- .../invertedindex/ondisk/OnDiskInvertedIndex.java | 3 + .../storage/am/lsm/rtree/impls/LSMRTree.java | 112 ++++++------ .../rtree/impls/LSMRTreeWithAntiMatterTuples.java | 197 +++++++++++---------- .../hyracks/storage/am/rtree/impls/RTree.java | 23 ++- .../storage/common/buffercache/BufferCache.java | 12 +- .../common/buffercache/FIFOLocalWriter.java | 14 +- .../common/buffercache/IFIFOPageWriter.java | 4 +- .../hyracks/util/ExponentialRetryPolicy.java | 22 ++- 53 files changed, 1055 insertions(+), 401 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 4e85de037d..c801283ab1 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -494,7 +494,7 @@ **/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java, **/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java, **/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java, - **/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java, + **/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,**/GCSCloudStorageUnstableTest, **/AzureBlobStorageExternalDatasetOnePartitionTest.java,**/SqlppSinglePointLookupExecutionTest.java, **/Atomic*.java, **/AwsS3*.java, **/*SqlppHdfs*.java, **/*RQGTest.java, **/*RQJTest.java </test.excludes> @@ -620,7 +620,7 @@ <id>asterix-gerrit-cloud-nons3-tests</id> <properties> <test.includes> - **/CloudStorageGCSTest.java, **/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java, + **/GCSCloudStorageUnstableTest.java,**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java, **/AzureBlobStorageExternalDatasetOnePartitionTest.java,**/CloudStorageUnstableTest.java, **/*SqlppHdfs*.java </test.includes> <failIfNoTests>false</failIfNoTests> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java index 466b617783..f66f097b4f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java @@ -47,12 +47,9 @@ public class HaltCallback implements IIoOperationFailedCallback { } private boolean haltOnFailure(ILSMIOOperation operation) { - switch (operation.getIOOperationType()) { - case CLEANUP: - case REPLICATE: - return false; - default: - return true; - } + return switch (operation.getIOOperationType()) { + case CLEANUP, REPLICATE, MERGE -> false; + default -> true; + }; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index c4c16abe2b..80fba4fee9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -480,7 +480,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (stats.getProfileType() == Stats.ProfileType.FULL) { this.jobFlags.add(JobFlag.PROFILE_RUNTIME); } - handleLoadStatement(metadataProvider, stmt, hcc); + handleLoadStatement(metadataProvider, stmt, hcc, requestParameters); break; case COPY_FROM: if (stats.getProfileType() == Stats.ProfileType.FULL) { @@ -4022,8 +4022,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleLoadStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) - throws Exception { + protected void handleLoadStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, + IRequestParameters requestParameters) throws Exception { LoadStatement loadStmt = (LoadStatement) stmt; String datasetName = loadStmt.getDatasetName(); metadataProvider.validateDatabaseObjectName(loadStmt.getNamespace(), datasetName, loadStmt.getSourceLocation()); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java index 65b5adf5e6..8c071f08a1 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java @@ -40,6 +40,7 @@ import org.junit.AfterClass; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.FixMethodOrder; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.MethodSorters; @@ -57,6 +58,7 @@ import com.google.cloud.storage.StorageOptions; * Run tests in cloud deployment environment */ @RunWith(Parameterized.class) +@Ignore @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class CloudStorageGCSTest { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java similarity index 73% copy from asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java copy to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java index 65b5adf5e6..c97e459fe3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java @@ -16,27 +16,34 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.asterix.test.cloud_storage; import static org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET; import static org.apache.asterix.api.common.LocalCloudUtil.MOCK_SERVER_REGION; -import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Objects; -import java.util.Random; import org.apache.asterix.api.common.LocalCloudUtilAdobeMock; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.storage.StorageIOStats; import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.test.runtime.ExecutionTestUtil; import org.apache.asterix.test.runtime.LangExecutionUtil; import org.apache.asterix.testframework.context.TestCaseContext; import org.apache.asterix.testframework.xml.Description; import org.apache.asterix.testframework.xml.TestCase; +import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.storage.common.buffercache.BufferCache; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.FixMethodOrder; @@ -44,7 +51,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.MethodSorters; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import com.google.cloud.NoCredentials; import com.google.cloud.storage.Blob; @@ -58,7 +64,7 @@ import com.google.cloud.storage.StorageOptions; */ @RunWith(Parameterized.class) @FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class CloudStorageGCSTest { +public class GCSCloudStorageUnstableTest { private static final Logger LOGGER = LogManager.getLogger(); @@ -71,13 +77,14 @@ public class CloudStorageGCSTest { public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:24443"; private static final String MOCK_SERVER_PROJECT_ID = "asterixdb-gcs-test-project-id"; - public CloudStorageGCSTest(TestCaseContext tcCtx) { + public GCSCloudStorageUnstableTest(TestCaseContext tcCtx) { this.tcCtx = tcCtx; } @BeforeClass public static void setUp() throws Exception { LocalCloudUtilAdobeMock.startS3CloudEnvironment(true, true); + System.setProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE, "true"); Storage storage = StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME) .setCredentials(NoCredentials.getInstance()).setProjectId(MOCK_SERVER_PROJECT_ID).build().getService(); cleanup(storage); @@ -92,27 +99,13 @@ public class CloudStorageGCSTest { @AfterClass public static void tearDown() throws Exception { + System.clearProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE); LangExecutionUtil.tearDown(); - LocalCloudUtilAdobeMock.shutdownSilently(); } - @Parameters(name = "CloudStorageGCSTest {index}: {0}") + @Parameterized.Parameters(name = "GCSCloudStorageUnstableTest {index}: {0}") public static Collection<Object[]> tests() throws Exception { - long seed = System.nanoTime(); - Random random = new Random(seed); - LOGGER.info("CloudStorageGCSTest seed {}", seed); - Collection<Object[]> tests = LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS); - List<Object[]> selected = new ArrayList<>(); - for (Object[] test : tests) { - if (!Objects.equals(((TestCaseContext) test[0]).getTestGroups()[0].getName(), "sqlpp_queries")) { - selected.add(test); - } - // Select 10% of the tests randomly - else if (random.nextInt(10) == 0) { - selected.add(test); - } - } - return selected; + return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS); } @Test @@ -120,6 +113,19 @@ public class CloudStorageGCSTest { List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit(); Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription()))); LangExecutionUtil.test(tcCtx); + for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) { + IDatasetLifecycleManager lifecycleManager = + ((INcApplicationContext) nc.getApplicationContext()).getDatasetLifecycleManager(); + StorageIOStats stats = lifecycleManager.getDatasetsIOStats(); + while (stats.getPendingFlushes() != 0 || stats.getPendingMerges() != 0) { + stats = lifecycleManager.getDatasetsIOStats(); + } + } + IBufferCache bufferCache; + for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) { + bufferCache = ((INcApplicationContext) nc.getApplicationContext()).getBufferCache(); + Assert.assertTrue(((BufferCache) bufferCache).isClean()); + } } private static String getText(Description description) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java new file mode 100644 index 0000000000..af580b26e8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.cloud_storage; + +import static org.apache.asterix.cloud.clients.UnstableCloudClient.ERROR_RATE; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.common.api.IResponsePrinter; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.statement.LoadStatement; +import org.apache.asterix.lang.common.statement.TruncateDatasetStatement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.utils.Creator; +import org.apache.asterix.translator.IRequestParameters; +import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class RetryingQueryTranslator extends QueryTranslator { + + private static final Logger LOGGER = LogManager.getLogger(); + + public RetryingQueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, + ILangCompilationProvider compilationProvider, ExecutorService executorService, + IResponsePrinter responsePrinter) { + super(appCtx, statements, output, compilationProvider, executorService, responsePrinter); + } + + @Override + public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc, IRequestParameters requestParameters, Creator creator) throws Exception { + int times = 100; + Exception ex = null; + double initialErrorRate = ERROR_RATE.get(); + try { + while (times-- > 0) { + try { + super.handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters, creator); + ex = null; + break; + } catch (Exception e) { + ERROR_RATE.set(Double.max(ERROR_RATE.get() - 0.01d, 0.01d)); + if (retryOnFailure(e)) { + LOGGER.error("Attempt: {}, Failed to create index", 100 - times, e); + metadataProvider.getLocks().reset(); + ex = e; + } else { + throw e; + } + } + } + if (ex != null) { + throw ex; + } + } finally { + ERROR_RATE.set(initialErrorRate); + } + } + + @Override + public void handleAnalyzeStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, + IRequestParameters requestParameters) throws Exception { + int times = 100; + Exception ex = null; + while (times-- > 0) { + try { + super.handleAnalyzeStatement(metadataProvider, stmt, hcc, requestParameters); + ex = null; + break; + } catch (Exception e) { + if (retryOnFailure(e)) { + LOGGER.error("Attempt: {}, Failed to create index", 100 - times, e); + metadataProvider.getLocks().reset(); + ex = e; + } else { + throw e; + } + } + } + if (ex != null) { + throw ex; + } + } + + @Override + public void handleLoadStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, + IRequestParameters requestParameters) throws Exception { + int times = 100; + Exception ex = null; + double initialErrorRate = ERROR_RATE.get(); + try { + while (times-- > 0) { + try { + super.handleLoadStatement(metadataProvider, stmt, hcc, requestParameters); + ex = null; + break; + } catch (Exception e) { + ERROR_RATE.set(Double.max(ERROR_RATE.get() - 0.01d, 0.01d)); + if (retryOnFailure(e)) { + LOGGER.error("Attempt: {}, Failed to load", 100 - times, e); + metadataProvider.getLocks().reset(); + ex = e; + LoadStatement loadStmt = (LoadStatement) stmt; + TruncateDatasetStatement truncateDatasetStatement = new TruncateDatasetStatement( + loadStmt.getNamespace(), new Identifier(loadStmt.getDatasetName()), true); + super.handleDatasetTruncateStatement(metadataProvider, truncateDatasetStatement, + requestParameters); + metadataProvider.getLocks().reset(); + + } else { + throw e; + } + } + } + if (ex != null) { + throw ex; + } + } finally { + ERROR_RATE.set(initialErrorRate); + } + } + + private boolean retryOnFailure(Exception e) { + if (e instanceof HyracksDataException) { + return ((HyracksDataException) e).getErrorCode() == ErrorCode.FAILED_IO_OPERATION.intValue() + && ExceptionUtils.getRootCause(e).getMessage().contains("Simulated error"); + } + return false; + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java new file mode 100644 index 0000000000..bf9aecbb83 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.cloud_storage; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.asterix.app.cc.IStatementExecutorExtension; +import org.apache.asterix.common.api.ExtensionId; +import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.application.IServiceContext; + +public class UnstableStatementExecutorExtension implements IStatementExecutorExtension { + + public static final ExtensionId RETRYING_QUERY_TRANSLATOR_EXTENSION_ID = + new ExtensionId(UnstableStatementExecutorExtension.class.getSimpleName(), 0); + + @Override + public ExtensionId getId() { + return RETRYING_QUERY_TRANSLATOR_EXTENSION_ID; + } + + @Override + public void configure(List<Pair<String, String>> args, IServiceContext serviceCtx) { + + } + + @Override + public IStatementExecutorFactory getQueryTranslatorFactory() { + return null; + } + + @Override + public IStatementExecutorFactory getStatementExecutorFactory(ExecutorService executorService) { + return new UnstableStatementExecutorFactory(executorService); + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java new file mode 100644 index 0000000000..f65276bb2d --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.cloud_storage; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.common.api.IResponsePrinter; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.translator.SessionOutput; + +public class UnstableStatementExecutorFactory extends DefaultStatementExecutorFactory { + public UnstableStatementExecutorFactory(ExecutorService executorService) { + super(executorService); + } + + @Override + public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, + ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider, + IResponsePrinter responsePrinter) { + return new RetryingQueryTranslator(appCtx, statements, output, compilationProvider, executorService, + responsePrinter); + } +} diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf index 9b6f5477cb..376252e690 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf @@ -73,3 +73,5 @@ cloud.storage.anonymous.auth=true cloud.storage.cache.policy=selective cloud.max.write.requests.per.second=1000 cloud.max.read.requests.per.second=5000 + +[extension/org.apache.asterix.test.cloud_storage.UnstableStatementExecutorExtension] diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java index a233ca5afb..9b832a2018 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudWriter; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest; import org.apache.hyracks.cloud.io.request.ICloudRequest; @@ -158,7 +159,7 @@ public class CloudResettableInputStream extends InputStream implements ICloudWri CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry); } catch (Exception e) { LOGGER.error(e); - throw HyracksDataException.create(e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } } bufferedWriter.finish(); @@ -190,7 +191,7 @@ public class CloudResettableInputStream extends InputStream implements ICloudWri try { close(); } catch (IOException e) { - throw HyracksDataException.create(e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } } @@ -203,7 +204,7 @@ public class CloudResettableInputStream extends InputStream implements ICloudWri CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry); } catch (Exception e) { LOGGER.error(e); - throw HyracksDataException.create(e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } writeBuffer.clear(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java index 2f61b31990..ccd6da1137 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java @@ -25,11 +25,15 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.apache.asterix.cloud.CloudResettableInputStream; import org.apache.asterix.cloud.IWriteBufferProvider; import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; +import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient; +import org.apache.asterix.cloud.clients.google.gcs.GCSWriter; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.control.nc.io.IOManager; @@ -39,7 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; public class UnstableCloudClient implements ICloudClient { // 10% error rate - private static final double ERROR_RATE = 0.1d; + public static final AtomicReference<Double> ERROR_RATE = new AtomicReference<>(0.11d); private static final Random RANDOM = new Random(0); private final ICloudClient cloudClient; @@ -61,6 +65,9 @@ public class UnstableCloudClient implements ICloudClient { public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { if (cloudClient instanceof S3CloudClient) { return createUnstableWriter((S3CloudClient) cloudClient, bucket, path, bufferProvider); + } else if (cloudClient instanceof GCSCloudClient) { + return new UnstableGCSCloudWriter(cloudClient.createWriter(bucket, path, bufferProvider), + cloudClient.getWriteBufferSize()); } return cloudClient.createWriter(bucket, path, bufferProvider); } @@ -138,8 +145,8 @@ public class UnstableCloudClient implements ICloudClient { private static void fail() throws HyracksDataException { double prob = RANDOM.nextInt(100) / 100.0d; - if (prob <= ERROR_RATE) { - throw HyracksDataException.create(new IOException("Simulated error")); + if (prob < ERROR_RATE.get()) { + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, new IOException("Simulated error")); } } @@ -150,6 +157,63 @@ public class UnstableCloudClient implements ICloudClient { return new CloudResettableInputStream(bufferedWriter, bufferProvider); } + /** + * An unstable cloud writer that mimics the functionality of {@link GCSWriter} + */ + private static class UnstableGCSCloudWriter implements ICloudWriter { + private final ICloudWriter writer; + private final int writeBufferSize; + + UnstableGCSCloudWriter(ICloudWriter writer, int writeBufferSize) { + this.writer = writer; + this.writeBufferSize = writeBufferSize; + } + + @Override + public int write(ByteBuffer header, ByteBuffer page) throws HyracksDataException { + return write(header) + write(page); + } + + @Override + public int write(ByteBuffer page) throws HyracksDataException { + if (position() == 0) { + fail(); + } + long uploadsToBeTriggered = + ((position() + page.remaining()) / writeBufferSize) - (position() / writeBufferSize); + while (uploadsToBeTriggered-- > 0) { + fail(); + } + return writer.write(page); + } + + @Override + public void write(int b) throws HyracksDataException { + write(ByteBuffer.wrap(new byte[] { (byte) b })); + } + + @Override + public int write(byte[] b, int off, int len) throws HyracksDataException { + return write(ByteBuffer.wrap(b, off, len)); + } + + @Override + public long position() { + return writer.position(); + } + + @Override + public void finish() throws HyracksDataException { + fail(); + writer.finish(); + } + + @Override + public void abort() throws HyracksDataException { + writer.abort(); + } + } + private static class UnstableCloudBufferedWriter implements ICloudBufferedWriter { private final ICloudBufferedWriter bufferedWriter; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java index 53d6546919..7bd7ad5bbf 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; import org.apache.logging.log4j.LogManager; @@ -131,7 +132,7 @@ public class S3BufferedWriter implements ICloudBufferedWriter { try { s3Client.completeMultipartUpload(request); } catch (Exception e) { - throw HyracksDataException.create(e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java index f240ad4d14..b142ea3b79 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -71,15 +72,15 @@ public class GCSWriter implements ICloudWriter { while (uploadsToBeTriggered-- > 0) { profiler.objectMultipartUpload(); } - setUploadId(); int written = 0; try { + setUploadId(); while (page.hasRemaining()) { written += writer.write(page); } } catch (IOException | RuntimeException e) { - throw HyracksDataException.create(e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } writtenBytes += written; @@ -104,12 +105,13 @@ public class GCSWriter implements ICloudWriter { @Override public void finish() throws HyracksDataException { guardian.checkWriteAccess(bucket, path); - setUploadId(); profiler.objectMultipartUpload(); try { + setUploadId(); + writer.close(); } catch (IOException | RuntimeException e) { - throw HyracksDataException.create(e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } finally { writer = null; } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java index 1f74fb3c0a..4cf2d23e40 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java @@ -164,7 +164,10 @@ public final class ObjectSchemaNode extends AbstractSchemaNestedNode { public void abort(DataInputStream input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels) throws IOException { - definitionLevels.put(this, new RunLengthIntArray()); + input.readByte(); // Skip the type tag, see ObjectSchemaNode#serialize + if (definitionLevels != null) { + definitionLevels.put(this, new RunLengthIntArray()); + } int numberOfChildren = input.readInt(); diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java index 5aca3a4577..9c58247ca0 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java @@ -271,7 +271,8 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { @Override public void abort() throws HyracksDataException { - DataInputStream input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray())); + DataInputStream input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray(), + serializedMetadata.getStartOffset(), serializedMetadata.getLength())); try { abort(input); } catch (IOException e) { @@ -280,6 +281,7 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { } private void abort(DataInputStream input) throws IOException { + input.skipBytes(OFFSETS_SIZE); level = -1; repeated = 0; changed = false; @@ -290,6 +292,9 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { fieldNamesDictionary.abort(input); definitionLevels.clear(); root.abort(input, definitionLevels); + if (metaRoot != null) { + metaRoot.abort(input, definitionLevels); + } } public static void deserializeWriters(DataInput input, List<IColumnValuesWriter> writers, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index 87aa3bd3c0..52af32909a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -141,6 +141,17 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { } } + @Override + public void afterFailure(ILSMIOOperation operation) { + if (isMerge(operation)) { + try { + ioManager.delete(getOperationMaskFilePath(operation)); + } catch (HyracksDataException e) { + operation.getFailure().addSuppressed(e); + } + } + } + protected void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException { // will always update the checkpoint file even if no new component was created FileReference target = operation.getTarget(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java index 21c3b27c70..17c5445514 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -57,6 +57,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI private LSMIndexDiskComponentBulkLoader componentBulkLoader; private int currentComponentPos = -1; + private boolean failed = false; public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory primaryIndexHelperFactory, @@ -98,7 +99,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI public void close() throws HyracksDataException { HyracksDataException closeException = null; try { - endCurrentComponent(); + endOrAbortCurrentComponent(); } catch (HyracksDataException e) { closeException = e; } @@ -151,6 +152,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI @Override public void fail() throws HyracksDataException { writer.fail(); + failed = true; } @Override @@ -177,15 +179,22 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI } } - private void endCurrentComponent() throws HyracksDataException { + private void endOrAbortCurrentComponent() throws HyracksDataException { if (componentBulkLoader != null) { - componentBulkLoader.end(); - componentBulkLoader = null; + try { + if (!failed) { + componentBulkLoader.end(); + } else { + componentBulkLoader.abort(); + } + } finally { + componentBulkLoader = null; + } } } private void loadNewComponent(int componentPos) throws HyracksDataException { - endCurrentComponent(); + endOrAbortCurrentComponent(); // This should never call componentBulkLoader.abort() int numTuples = getNumDeletedTuples(componentPos); ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos); Map<String, Object> parameters = new HashMap<>(); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 7cb107dfc5..d7abf4c057 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -162,6 +162,7 @@ public enum ErrorCode implements IError { EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132), EMPTY_TYPE_INFERRED(133), SCHEMA_LIMIT_EXCEEDED(134), + FAILED_IO_OPERATION(135), // Compilation error codes. RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000), diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index e1fbe3020c..e9569be66f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -152,6 +152,7 @@ 132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the schema 133 = Schema could not be inferred, empty types found in the result 134 = Schema Limit exceeded, maximum number of heterogeneous schemas allowed : '%1$s' +135 = An IO Operation has failed 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java index aa174741a6..abb03ebbff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java @@ -386,13 +386,15 @@ public class BloomFilter { @Override public void end() throws HyracksDataException { - allocateAndInitMetaDataPage(); - pageWriter.write(metaDataPage); - for (ICachedPage p : pages) { - pageWriter.write(p); - } - if (hasFailed()) { - throw HyracksDataException.create(getFailure()); + try { + allocateAndInitMetaDataPage(); + pageWriter.write(metaDataPage); + for (ICachedPage p : pages) { + pageWriter.write(p); + } + } catch (HyracksDataException e) { + handleException(); + throw e; } BloomFilter.this.numBits = numBits; BloomFilter.this.numHashes = numHashes; @@ -401,8 +403,7 @@ public class BloomFilter { BloomFilter.this.version = BLOCKED_BLOOM_FILTER_VERSION; } - @Override - public void abort() throws HyracksDataException { + private void handleException() { for (ICachedPage p : pages) { if (p != null) { bufferCache.returnPage(p, false); @@ -413,6 +414,11 @@ public class BloomFilter { } } + @Override + public void abort() throws HyracksDataException { + handleException(); + } + @Override public void force() throws HyracksDataException { bufferCache.force(fileId, false); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java index 208d3b701e..ff28464a93 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java @@ -48,6 +48,12 @@ public interface IPageManager { */ void close(IPageWriteFailureCallback failureCallback) throws HyracksDataException; + /** + * Return all the pages from the page manager + */ + default void returnAllPages() { + } + /** * Create a metadata frame to be used for reading and writing to metadata pages * diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java index 0206f201d0..d606a5c649 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java @@ -61,6 +61,7 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu protected final IIndexBulkLoader[] bulkLoaders; protected ITupleFilter tupleFilter; protected FrameTupleReference frameTuple; + private boolean failed = false; public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint, @@ -151,6 +152,7 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu @Override public void fail() throws HyracksDataException { writer.fail(); + failed = true; } protected void initializeBulkLoader(IIndex index, int indexId) throws HyracksDataException { @@ -159,12 +161,28 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu } private void closeBulkLoaders() throws HyracksDataException { + HyracksDataException failure = null; for (IIndexBulkLoader bulkLoader : bulkLoaders) { // bulkloader can be null if an exception is thrown before it is initialized. - if (bulkLoader != null) { - bulkLoader.end(); + try { + if (bulkLoader != null) { + if (failure == null && !failed) { + bulkLoader.end(); + } else { + bulkLoader.abort(); + } + } + } catch (HyracksDataException e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } } } + if (failure != null) { + throw failure; + } } protected static void closeIndexes(IIndex[] indexes, IIndexDataflowHelper[] indexHelpers, diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java index dae01bfe88..9c51d92202 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java @@ -152,6 +152,13 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager metadataPages.clear(); } + @Override + public void returnAllPages() { + for (ICachedPage page : metadataPages) { + bufferCache.returnPage(page, false); + } + } + /** * For storage on append-only media (such as HDFS), the meta data page has to be written last. * However, some implementations still write the meta data to the front. To deal with this as well diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java index d5f836c51b..5ef3aab8b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java @@ -130,6 +130,7 @@ public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallba } } releasedLatches = true; + freePageManager.returnAllPages(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 05e78dd44d..3526d2f144 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -238,73 +238,83 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation; LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) flushOp.getFlushingComponent(); IIndexAccessor accessor = flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - ILSMDiskComponent component; - ILSMDiskComponentBulkLoader componentBulkLoader; + ILSMDiskComponent component = null; + ILSMDiskComponentBulkLoader componentBulkLoader = null; try { - RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null); - long numElements = 0L; - if (hasBloomFilter) { - //count elements in btree for creating Bloomfilter - IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor(); - accessor.search(countingCursor, nullPred); + try { + RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null); + long numElements = 0L; + if (hasBloomFilter) { + //count elements in btree for creating Bloomfilter + IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor(); + accessor.search(countingCursor, nullPred); + try { + while (countingCursor.hasNext()) { + countingCursor.next(); + ITupleReference countTuple = countingCursor.getTuple(); + numElements = IntegerPointable.getInteger(countTuple.getFieldData(0), + countTuple.getFieldStart(0)); + } + } finally { + try { + countingCursor.close(); + } finally { + countingCursor.destroy(); + } + } + } + component = createDiskComponent(componentFactory, flushOp.getTarget(), null, + flushOp.getBloomFilterTarget(), true); + componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, numElements, false, false, + false, pageWriteCallbackFactory.createPageWriteCallback()); + IIndexCursor scanCursor = accessor.createSearchCursor(false); + accessor.search(scanCursor, nullPred); try { - while (countingCursor.hasNext()) { - countingCursor.next(); - ITupleReference countTuple = countingCursor.getTuple(); - numElements = - IntegerPointable.getInteger(countTuple.getFieldData(0), countTuple.getFieldStart(0)); + while (scanCursor.hasNext()) { + scanCursor.next(); + // we can safely throw away updated tuples in secondary BTree components, because they correspond to + // deleted tuples + if (updateAware && ((LSMBTreeTupleReference) scanCursor.getTuple()).isUpdated()) { + continue; + } + componentBulkLoader.add(scanCursor.getTuple()); } } finally { try { - countingCursor.close(); + scanCursor.close(); } finally { - countingCursor.destroy(); + scanCursor.destroy(); } } + } finally { + accessor.destroy(); } - component = createDiskComponent(componentFactory, flushOp.getTarget(), null, flushOp.getBloomFilterTarget(), - true); - componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, numElements, false, false, false, - pageWriteCallbackFactory.createPageWriteCallback()); - IIndexCursor scanCursor = accessor.createSearchCursor(false); - accessor.search(scanCursor, nullPred); + if (component.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); + getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); + } + // Write metadata from memory component to disk + // Q. what about the merge operation? how do we resolve conflicts + // A. Through providing an appropriate ILSMIOOperationCallback + // Must not reset the metadata before the flush is completed + // Use the copy of the metadata in the opContext + // TODO This code should be in the callback and not in the index + flushingComponent.getMetadata().copy(component.getMetadata()); + componentBulkLoader.end(); + } catch (Throwable e) { try { - while (scanCursor.hasNext()) { - scanCursor.next(); - // we can safely throw away updated tuples in secondary BTree components, because they correspond to - // deleted tuples - if (updateAware && ((LSMBTreeTupleReference) scanCursor.getTuple()).isUpdated()) { - continue; - } - componentBulkLoader.add(scanCursor.getTuple()); - } - } finally { - try { - scanCursor.close(); - } finally { - scanCursor.destroy(); + if (componentBulkLoader != null) { + componentBulkLoader.abort(); } + } catch (Throwable th) { + e.addSuppressed(th); } - } finally { - accessor.destroy(); - } - if (component.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, - NoOpOperationCallback.INSTANCE); - getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); + throw e; } - // Write metadata from memory component to disk - // Q. what about the merge operation? how do we resolve conflicts - // A. Through providing an appropriate ILSMIOOperationCallback - // Must not reset the metadata before the flush is completed - // Use the copy of the metadata in the opContext - // TODO This code should be in the callback and not in the index - flushingComponent.getMetadata().copy(component.getMetadata()); - - componentBulkLoader.end(); return component; } @@ -313,7 +323,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation; IIndexCursor cursor = mergeOp.getCursor(); - ILSMDiskComponent mergedComponent; + ILSMDiskComponent mergedComponent = null; ILSMDiskComponentBulkLoader componentBulkLoader = null; try { try { @@ -349,6 +359,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getMetadataHolder()); } + componentBulkLoader.end(); } catch (Throwable e) { // NOSONAR.. As per the contract, we should either abort or end try { if (componentBulkLoader != null) { @@ -359,7 +370,6 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex { } throw e; } - componentBulkLoader.end(); return mergedComponent; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java index db36c72a99..c77d7ff884 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java @@ -47,8 +47,13 @@ public abstract class AbstractLSMWithBloomFilterDiskComponent extends AbstractLS public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { // The order of forcing the dirty page to be flushed is critical. The // bloom filter must be always done first. - ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist); - super.markAsValid(persist, callback); + try { + ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist); + super.markAsValid(persist, callback); + } catch (HyracksDataException ex) { + returnPages(); + throw ex; + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java index fda48f04c3..158a3b8871 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java @@ -40,8 +40,19 @@ public abstract class AbstractLSMWithBuddyDiskComponent extends AbstractLSMWithB @Override public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { - super.markAsValid(persist, callback); - ComponentUtils.markAsValid(getBuddyIndex(), persist, callback); + try { + super.markAsValid(persist, callback); + ComponentUtils.markAsValid(getBuddyIndex(), persist, callback); + } catch (HyracksDataException ex) { + returnPages(); + throw ex; + } + } + + @Override + public void returnPages() { + getBuddyIndex().getPageManager().returnAllPages(); + super.returnPages(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java index 029a59de63..e41a17efa7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java @@ -137,4 +137,9 @@ public interface ILSMDiskComponent extends ILSMComponent { ILSMDiskComponentBulkLoader createBulkLoader(ILSMIOOperation operation, float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent, IPageWriteCallback callback) throws HyracksDataException; + + /** + * Returns all pages of the component to the buffer cache + */ + void returnPages(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java index a778a4c1ee..542cf47b6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java @@ -58,6 +58,12 @@ public interface ILSMIOOperationCallback { */ void afterFinalize(ILSMIOOperation operation) throws HyracksDataException; + /** + * This method is called on an IO operation after the operation fails + */ + default void afterFailure(ILSMIOOperation operation) { + } + /** * This method is called after the schduler is done with the IO operation * For operation that are not scheduled, this call is skipped diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 627b853919..b6066fc736 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -160,8 +160,18 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl */ @Override public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { - ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); - LOGGER.debug("marked {} as valid component with id {}", getIndex(), getId()); + try { + ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); + LOGGER.debug("marked {} as valid component with id {}", getIndex(), getId()); + } catch (Exception e) { + returnPages(); + throw e; + } + } + + @Override + public void returnPages() { + ComponentUtils.returnPages(getMetadataHolder()); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 4f7c624996..3f60978c83 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -583,7 +583,12 @@ public abstract class AbstractLSMIndex implements ILSMIndex { throws HyracksDataException { ILSMDiskComponent component = factory.createComponent(this, new LSMComponentFileReferences(insertFileReference, deleteIndexFileReference, bloomFilterFileRef)); - component.activate(createComponent); + try { + component.activate(createComponent); + } catch (HyracksDataException e) { + component.returnPages(); + throw e; + } return component; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java index 0214712c27..f90da79d53 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java @@ -127,9 +127,20 @@ public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkL @Override public void abort() throws HyracksDataException { operation.setStatus(LSMIOOperationStatus.FAILURE); - final int bulkloadersCount = bulkloaderChain.size(); - for (int i = 0; i < bulkloadersCount; i++) { - bulkloaderChain.get(i).abort(); + HyracksDataException failure = null; + for (IChainedComponentBulkLoader componentBulkLoader : bulkloaderChain) { + try { + componentBulkLoader.abort(); + } catch (HyracksDataException e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } + } + } + if (failure != null) { + throw failure; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java index 9a112ee4d5..0dfb12ef27 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java @@ -157,4 +157,9 @@ public class EmptyComponent implements ILSMDiskComponent { public String toString() { return "EmptyComponent"; } + + @Override + public void returnPages() { + // Do nothing + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java index 9346b56a80..392872477c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java @@ -21,7 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.io.FileReference; -public final class LSMComponentFileReferences { +public class LSMComponentFileReferences { // The FileReference for the index that is used for inserting records of the component. For instance, this will be the FileReference of the RTree in one component of the LSM-RTree. private final FileReference insertIndexFileReference; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 6e7681da6c..b1660a500b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -267,7 +267,7 @@ public class LSMHarness implements ILSMHarness { if (inactiveMemoryComponentsToBeCleanedUp != null) { cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp); } - if (opType == LSMOperationType.FLUSH) { + if (opType == LSMOperationType.FLUSH && !failedOperation) { ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); // We must call flushed without synchronizing on opTracker to avoid deadlocks flushingComponent.flushed(); @@ -577,6 +577,7 @@ public class LSMHarness implements ILSMHarness { // if the operation failed, we need to cleanup files if (operation.getStatus() == LSMIOOperationStatus.FAILURE) { operation.cleanup(lsmIndex.getBufferCache()); + operation.getCallback().afterFailure(operation); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java index 3e17cb040d..88fa10546d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -53,7 +53,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { try { componentBulkLoader.add(tuple); } catch (Throwable th) { - opCtx.getIoOperation().setFailure(th); + fail(th); throw th; } } @@ -63,7 +63,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { try { componentBulkLoader.delete(tuple); } catch (Throwable th) { - opCtx.getIoOperation().setFailure(th); + fail(th); throw th; } } @@ -83,6 +83,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { @Override public void abort() throws HyracksDataException { opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE); + fail(null); try { try { componentBulkLoader.abort(); @@ -115,6 +116,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { componentBulkLoader.end(); } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure fail(th); + componentBulkLoader.abort(); throw th; } finally { lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation()); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java new file mode 100644 index 0000000000..7d10c13b20 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.io.FileReference; + +public class LSMInvertedComponentFileReferences extends LSMComponentFileReferences { + + private final FileReference invListsFileReference; + + public LSMInvertedComponentFileReferences(FileReference insertIndexFileReference, + FileReference deleteIndexFileReference, FileReference bloomFilterFileReference, + FileReference invListsFileReference) { + super(insertIndexFileReference, deleteIndexFileReference, bloomFilterFileReference); + this.invListsFileReference = invListsFileReference; + } + + public FileReference getInvListsFileReference() { + return invListsFileReference; + } + + @Override + public FileReference[] getFileReferences() { + return new FileReference[] { getInsertIndexFileReference(), getDeleteIndexFileReference(), + getBloomFilterFileReference(), invListsFileReference }; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index c768768c0a..52bd07ed8b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -41,6 +41,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.util.ExponentialRetryPolicy; +import org.apache.hyracks.util.IRetryPolicy; public class LSMTreeIndexAccessor implements ILSMIndexAccessor { @FunctionalInterface @@ -122,6 +124,18 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { @Override public void flush(ILSMIOOperation operation) throws HyracksDataException { lsmHarness.flush(operation); + if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { + IRetryPolicy policy = new ExponentialRetryPolicy(); + while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { + if (policy.retry(operation.getFailure())) { + operation.setFailure(null); + operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS); + lsmHarness.flush(operation); + } else { + break; + } + } + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java index 842ec61527..a8cbfdd400 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java @@ -169,6 +169,22 @@ public class ComponentUtils { } } + public static void returnPages(ITreeIndex treeIndex) { + treeIndex.getPageManager().returnAllPages(); + IBufferCache bufferCache = treeIndex.getBufferCache(); + // We need to return all pages to the buffer cache in case of a failure + try { + bufferCache.getCompressedPageWriter(treeIndex.getFileId()).abort(); + } catch (IllegalStateException | NullPointerException ignored) { + // Since we call this method in multiple places, it is possible that the writer + // is not in the State.WRITABLE, which would throw an IllegalStateException. + // This means the writer has already written all the pages. + // + // We also catch NullPointerException in case the writer is not initialized. + // Or if the compressed page writer is not applicable to this case + } + } + public static void markAsValid(IBufferCache bufferCache, BloomFilter filter, boolean forceToDisk) throws HyracksDataException { if (forceToDisk) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index a828d229e1..f98a05629e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -18,6 +18,9 @@ */ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; +import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER; +import static org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager.INVLISTS_SUFFIX; + import java.util.ArrayList; import java.util.List; @@ -286,46 +289,56 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex // Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index. IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor(false); try { - deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred); try { - while (deletedKeysScanCursor.hasNext()) { - deletedKeysScanCursor.next(); - componentBulkLoader.delete(deletedKeysScanCursor.getTuple()); + deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred); + try { + while (deletedKeysScanCursor.hasNext()) { + deletedKeysScanCursor.next(); + componentBulkLoader.delete(deletedKeysScanCursor.getTuple()); + } + } finally { + deletedKeysScanCursor.close(); } } finally { - deletedKeysScanCursor.close(); + deletedKeysScanCursor.destroy(); } - } finally { - deletedKeysScanCursor.destroy(); - } - // Scan the in-memory inverted index - InMemoryInvertedIndexAccessor memInvIndexAccessor = - flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor(); - IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false); - try { - memBTreeAccessor.search(scanCursor, nullPred); - // Bulk load the disk inverted index from the in-memory inverted index. + // Scan the in-memory inverted index + InMemoryInvertedIndexAccessor memInvIndexAccessor = + flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); + BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor(); + IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false); try { - while (scanCursor.hasNext()) { - scanCursor.next(); - componentBulkLoader.add(scanCursor.getTuple()); + memBTreeAccessor.search(scanCursor, nullPred); + // Bulk load the disk inverted index from the in-memory inverted index. + try { + while (scanCursor.hasNext()) { + scanCursor.next(); + componentBulkLoader.add(scanCursor.getTuple()); + } + } finally { + scanCursor.close(); } } finally { - scanCursor.close(); + scanCursor.destroy(); } - } finally { - scanCursor.destroy(); - } - if (component.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples, NoOpOperationCallback.INSTANCE); - filterManager.writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); + if (component.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); + filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); + filterManager.writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); + } + flushingComponent.getMetadata().copy(component.getMetadata()); + componentBulkLoader.end(); + } catch (Throwable e) { + try { + componentBulkLoader.abort(); + } catch (Throwable th) { + e.addSuppressed(th); + } + throw e; } - flushingComponent.getMetadata().copy(component.getMetadata()); - componentBulkLoader.end(); return component; } @@ -339,60 +352,71 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex // Create an inverted index instance. ILSMDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getDeletedKeysBTreeTarget(), mergeOp.getBloomFilterTarget(), true); - ILSMDiskComponentBulkLoader componentBulkLoader; + ILSMDiskComponentBulkLoader componentBulkLoader = null; // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted // indexes so that lsmHarness.endSearch() is called once when the inverted indexes have been merged. - if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents - .get(diskComponents.size() - 1)) { - // Keep the deleted tuples since the oldest disk component is not included in the merge operation - LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = - new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx, mergeOp.getCursorStats()); - try { - long numElements = 0L; - for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { - numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i)) - .getBloomFilter().getNumElements(); - } - componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, numElements, false, false, - false, pageWriteCallbackFactory.createPageWriteCallback()); - loadDeleteTuples(opCtx, btreeCursor, mergePred, componentBulkLoader); - } finally { - btreeCursor.destroy(); - } - } else { - componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, 0L, false, false, false, - pageWriteCallbackFactory.createPageWriteCallback()); - } - search(opCtx, cursor, mergePred); try { - while (cursor.hasNext()) { - cursor.next(); - componentBulkLoader.add(cursor.getTuple()); + if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents + .get(diskComponents.size() - 1)) { + // Keep the deleted tuples since the oldest disk component is not included in the merge operation + LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = + new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx, mergeOp.getCursorStats()); + try { + long numElements = 0L; + for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { + numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i)) + .getBloomFilter().getNumElements(); + } + componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, numElements, false, false, + false, pageWriteCallbackFactory.createPageWriteCallback()); + loadDeleteTuples(opCtx, btreeCursor, mergePred, componentBulkLoader); + } finally { + btreeCursor.destroy(); + } + } else { + componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, 0L, false, false, false, + pageWriteCallbackFactory.createPageWriteCallback()); } - } finally { + search(opCtx, cursor, mergePred); try { - cursor.close(); + while (cursor.hasNext()) { + cursor.next(); + componentBulkLoader.add(cursor.getTuple()); + } } finally { - cursor.destroy(); + try { + cursor.close(); + } finally { + cursor.destroy(); + } } - } - if (component.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { - ITupleReference min = mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple(); - ITupleReference max = mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple(); - if (min != null) { - filterTuples.add(min); + if (component.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { + ITupleReference min = mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple(); + ITupleReference max = mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple(); + if (min != null) { + filterTuples.add(min); + } + if (max != null) { + filterTuples.add(max); + } } - if (max != null) { - filterTuples.add(max); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); + getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); + } + componentBulkLoader.end(); + } catch (Throwable e) { + try { + if (componentBulkLoader != null) { + componentBulkLoader.abort(); } + } catch (Throwable th) { + e.addSuppressed(th); } - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, - NoOpOperationCallback.INSTANCE); - getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); + throw e; } - componentBulkLoader.end(); return component; } @@ -487,7 +511,10 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getHarness(), opCtx), componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, getIndexIdentifier()); + componentFileRefs.getBloomFilterFileReference(), + ioManager.resolve( + getInvListsFilePath(componentFileRefs.getInsertIndexFileReference().getAbsolutePath())), + callback, getIndexIdentifier()); } @Override @@ -497,7 +524,14 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex IIndexCursorStats stats = new IndexCursorStats(); IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx, stats); return new LSMInvertedIndexMergeOperation(accessor, cursor, stats, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - getIndexIdentifier()); + mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), + ioManager.resolve(getInvListsFilePath(mergeFileRefs.getInsertIndexFileReference().getAbsolutePath())), + callback, getIndexIdentifier()); + } + + public String getInvListsFilePath(String dictBTreeFilePath) { + int index = dictBTreeFilePath.lastIndexOf(DELIMITER); + String file = dictBTreeFilePath.substring(0, index); + return file + DELIMITER + INVLISTS_SUFFIX; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index 67312ceb23..d8900ad2af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -37,6 +37,8 @@ import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccesso import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.util.ExponentialRetryPolicy; +import org.apache.hyracks.util.IRetryPolicy; public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedIndexAccessor { @@ -93,6 +95,18 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd @Override public void flush(ILSMIOOperation operation) throws HyracksDataException { lsmHarness.flush(operation); + if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { + IRetryPolicy policy = new ExponentialRetryPolicy(); + while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) { + if (policy.retry(operation.getFailure())) { + operation.setFailure(null); + operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS); + lsmHarness.flush(operation); + } else { + break; + } + } + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java index 6de6ab5b64..3edb5feead 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java @@ -105,17 +105,22 @@ public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskCompo @Override public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { - ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist); - - // Flush inverted index second. - invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true); - ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); - if (!callback.hasFailed()) { - // Flush deleted keys BTree. - ComponentUtils.markAsValid(getBuddyIndex(), persist, callback); - } - if (callback.hasFailed()) { - throw HyracksDataException.create(callback.getFailure()); + try { + ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist); + + // Flush inverted index second. + invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true); + ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); + if (!callback.hasFailed()) { + // Flush deleted keys BTree. + ComponentUtils.markAsValid(getBuddyIndex(), persist, callback); + } + if (callback.hasFailed()) { + throw HyracksDataException.create(callback.getFailure()); + } + } catch (HyracksDataException ex) { + returnPages(); + throw ex; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java index 3b8f82a0a0..442df0b39a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java @@ -34,6 +34,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory; import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper; // TODO: Refactor for better code sharing with other file managers. @@ -59,17 +60,19 @@ public class LSMInvertedIndexFileManager extends AbstractLSMIndexFileManager imp @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { String baseName = getNextComponentSequence(deletedKeysBTreeFilter); - return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX), + return new LSMInvertedComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX), - baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); + baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX), + baseDir.getChild(baseName + DELIMITER + INVLISTS_SUFFIX)); } @Override public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); - return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX), + return new LSMInvertedComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX), - baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); + baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX), + baseDir.getChild(baseName + DELIMITER + INVLISTS_SUFFIX)); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java index 5b272eeea4..efd5060f91 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java @@ -24,17 +24,20 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences; public class LSMInvertedIndexFlushOperation extends FlushOperation { private final FileReference deletedKeysBTreeFlushTarget; private final FileReference bloomFilterFlushTarget; + private final FileReference invListsFlushTarget; public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { + FileReference invListsFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { super(accessor, flushTarget, callback, indexIdentifier); this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; + this.invListsFlushTarget = invListsFlushTarget; } public FileReference getDeletedKeysBTreeTarget() { @@ -47,6 +50,7 @@ public class LSMInvertedIndexFlushOperation extends FlushOperation { @Override public LSMComponentFileReferences getComponentFiles() { - return new LSMComponentFileReferences(target, deletedKeysBTreeFlushTarget, bloomFilterFlushTarget); + return new LSMInvertedComponentFileReferences(target, deletedKeysBTreeFlushTarget, bloomFilterFlushTarget, + invListsFlushTarget); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java index 31c79f0a62..4ebb9d3cc1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.IIndexCursorStats; @@ -30,13 +31,15 @@ import org.apache.hyracks.storage.common.IIndexCursorStats; public class LSMInvertedIndexMergeOperation extends MergeOperation { private final FileReference deletedKeysBTreeMergeTarget; private final FileReference bloomFilterMergeTarget; + private final FileReference invListsMergeTarget; public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, IIndexCursorStats stats, FileReference target, FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { + FileReference invListsMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { super(accessor, target, callback, indexIdentifier, cursor, stats); this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; + this.invListsMergeTarget = invListsMergeTarget; } public FileReference getDeletedKeysBTreeTarget() { @@ -49,7 +52,8 @@ public class LSMInvertedIndexMergeOperation extends MergeOperation { @Override public LSMComponentFileReferences getComponentFiles() { - return new LSMComponentFileReferences(target, deletedKeysBTreeMergeTarget, bloomFilterMergeTarget); + return new LSMInvertedComponentFileReferences(target, deletedKeysBTreeMergeTarget, bloomFilterMergeTarget, + invListsMergeTarget); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java index 2583d0ab69..47068a30fe 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java @@ -397,6 +397,9 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { if (btreeBulkloader != null) { btreeBulkloader.abort(); } + if (currentPage != null) { + bufferCache.returnPage(currentPage, false); + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index a8a190988e..8b88e7ee51 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -110,7 +110,6 @@ public class LSMRTree extends AbstractLSMRTree { RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); BTreeAccessor memBTreeAccessor = flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - boolean abort = true; try { try { rTreeTupleSorter = getRTreeTupleSorter(flushingComponent, memBTreeAccessor, btreeNullPredicate, @@ -142,12 +141,16 @@ public class LSMRTree extends AbstractLSMRTree { } // Note. If we change the filter to write to metadata object, we don't need the if block above flushingComponent.getMetadata().copy(component.getMetadata()); - abort = false; componentBulkLoader.end(); - } finally { - if (abort && componentBulkLoader != null) { - componentBulkLoader.abort(); + } catch (Throwable e) { + try { + if (componentBulkLoader != null) { + componentBulkLoader.abort(); + } + } catch (Throwable th) { + e.addSuppressed(th); } + throw e; } return component; } @@ -264,32 +267,35 @@ public class LSMRTree extends AbstractLSMRTree { IIndexCursor cursor = mergeOp.getCursor(); ILSMDiskComponentBulkLoader componentBulkLoader = null; ILSMDiskComponent mergedComponent = null; - boolean abort = true; try { - mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBTreeTarget(), - mergeOp.getBloomFilterTarget(), true); - componentBulkLoader = loadMergeBulkLoader(mergeOp, cursor, mergedComponent); - if (mergedComponent.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { - filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); - filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); + try { + mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBTreeTarget(), + mergeOp.getBloomFilterTarget(), true); + componentBulkLoader = loadMergeBulkLoader(mergeOp, cursor, mergedComponent); + if (mergedComponent.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { + filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); + filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); + } + getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); + getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), + mergedComponent.getMetadataHolder()); } - getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples, - NoOpOperationCallback.INSTANCE); - getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), - mergedComponent.getMetadataHolder()); + componentBulkLoader.end(); + } finally { + cursor.destroy(); } - abort = false; - componentBulkLoader.end(); - } finally { + } catch (Throwable e) { try { - cursor.destroy(); - } finally { - if (abort && componentBulkLoader != null) { + if (componentBulkLoader != null) { componentBulkLoader.abort(); } + } catch (Throwable th) { + e.addSuppressed(th); } + throw e; } return mergedComponent; } @@ -297,46 +303,38 @@ public class LSMRTree extends AbstractLSMRTree { private ILSMDiskComponentBulkLoader loadMergeBulkLoader(LSMRTreeMergeOperation mergeOp, IIndexCursor cursor, ILSMDiskComponent mergedComponent) throws HyracksDataException { ILSMDiskComponentBulkLoader componentBulkLoader = null; - boolean abort = true; ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null); ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor) cursor).getOpCtx(); search(opCtx, cursor, rtreeSearchPred); try { - try { - // In case we must keep the deleted-keys BTrees, then they must be merged - // *before* merging the r-trees so that - // lsmHarness.endSearch() is called once when the r-trees have been merged. - if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents - .get(diskComponents.size() - 1)) { - // Keep the deleted tuples since the oldest disk component - // is not included in the merge operation - long numElements = 0L; - for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { - numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter() - .getNumElements(); - } - componentBulkLoader = mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false, - false, false, pageWriteCallbackFactory.createPageWriteCallback()); - mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred, componentBulkLoader); - } else { - //no buddy-btree needed - componentBulkLoader = mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false, - false, pageWriteCallbackFactory.createPageWriteCallback()); - } - //search old rtree components - while (cursor.hasNext()) { - cursor.next(); - ITupleReference frameTuple = cursor.getTuple(); - componentBulkLoader.add(frameTuple); + // In case we must keep the deleted-keys BTrees, then they must be merged + // *before* merging the r-trees so that + // lsmHarness.endSearch() is called once when the r-trees have been merged. + if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents + .get(diskComponents.size() - 1)) { + // Keep the deleted tuples since the oldest disk component + // is not included in the merge operation + long numElements = 0L; + for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { + numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter() + .getNumElements(); } - } finally { - cursor.close(); + componentBulkLoader = mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false, false, + false, pageWriteCallbackFactory.createPageWriteCallback()); + mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred, componentBulkLoader); + } else { + //no buddy-btree needed + componentBulkLoader = mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false, false, + pageWriteCallbackFactory.createPageWriteCallback()); } - abort = false; - } finally { - if (abort && componentBulkLoader != null) { - componentBulkLoader.abort(); + //search old rtree components + while (cursor.hasNext()) { + cursor.next(); + ITupleReference frameTuple = cursor.getTuple(); + componentBulkLoader.add(frameTuple); } + } finally { + cursor.close(); } return componentBulkLoader; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index 729ca74363..6968b3c3c3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -103,107 +103,111 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { TreeTupleSorter rTreeTupleSorter = null; TreeTupleSorter bTreeTupleSorter = null; boolean isEmpty = true; - boolean abort = true; try { - RTreeAccessor memRTreeAccessor = - flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - try { - RTreeSearchCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor(false); + RTreeAccessor memRTreeAccessor = + flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); + try { - memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate); - component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true); - componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, 0L, false, false, false, - pageWriteCallbackFactory.createPageWriteCallback()); - // Since the LSM-RTree is used as a secondary assumption, the - // primary key will be the last comparator in the BTree comparators - rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray, - rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), - flushingComponent.getIndex().getBufferCache(), comparatorFields); + RTreeSearchCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor(false); try { - isEmpty = scanAndSort(rtreeScanCursor, rTreeTupleSorter); + memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate); + component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true); + componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, 0L, false, false, + false, pageWriteCallbackFactory.createPageWriteCallback()); + // Since the LSM-RTree is used as a secondary assumption, the + // primary key will be the last comparator in the BTree comparators + rTreeTupleSorter = + new TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray, + rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), + flushingComponent.getIndex().getBufferCache(), comparatorFields); + try { + isEmpty = scanAndSort(rtreeScanCursor, rTreeTupleSorter); + } finally { + rtreeScanCursor.close(); + } } finally { - rtreeScanCursor.close(); + rtreeScanCursor.destroy(); } } finally { - rtreeScanCursor.destroy(); + memRTreeAccessor.destroy(); } - } finally { - memRTreeAccessor.destroy(); - } - if (!isEmpty) { - rTreeTupleSorter.sort(); - } - // scan the memory BTree - RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); - BTreeAccessor memBTreeAccessor = - flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - try { - bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(), linearizerArray, - btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(), - flushingComponent.getBuddyIndex().getBufferCache(), comparatorFields); - BTreeRangeSearchCursor btreeScanCursor = - (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false); + if (!isEmpty) { + rTreeTupleSorter.sort(); + } + // scan the memory BTree + RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); + BTreeAccessor memBTreeAccessor = + flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); try { - isEmpty = true; - memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); + bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(), + linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(), + flushingComponent.getBuddyIndex().getBufferCache(), comparatorFields); + BTreeRangeSearchCursor btreeScanCursor = + (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false); try { - isEmpty = scanAndSort(btreeScanCursor, bTreeTupleSorter); + isEmpty = true; + memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); + try { + isEmpty = scanAndSort(btreeScanCursor, bTreeTupleSorter); + } finally { + btreeScanCursor.close(); + } } finally { - btreeScanCursor.close(); + btreeScanCursor.destroy(); } } finally { - btreeScanCursor.destroy(); + memBTreeAccessor.destroy(); } - } finally { - memBTreeAccessor.destroy(); - } - if (!isEmpty) { - bTreeTupleSorter.sort(); - } - LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor( - rTreeTupleSorter, bTreeTupleSorter, comparatorFields, linearizerArray); - try { - cursor.open(null, null); + if (!isEmpty) { + bTreeTupleSorter.sort(); + } + LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor( + rTreeTupleSorter, bTreeTupleSorter, comparatorFields, linearizerArray); try { - while (cursor.hasNext()) { - cursor.next(); - ITupleReference frameTuple = cursor.getTuple(); - componentBulkLoader.add(frameTuple); + cursor.open(null, null); + try { + while (cursor.hasNext()) { + cursor.next(); + ITupleReference frameTuple = cursor.getTuple(); + componentBulkLoader.add(frameTuple); + } + } finally { + cursor.close(); } } finally { - cursor.close(); + cursor.destroy(); } - } finally { - cursor.destroy(); - } - if (component.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, - NoOpOperationCallback.INSTANCE); - getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); - } - flushingComponent.getMetadata().copy(component.getMetadata()); - abort = false; - componentBulkLoader.end(); - } finally { - try { - if (rTreeTupleSorter != null) { - rTreeTupleSorter.destroy(); + if (component.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); + getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } + flushingComponent.getMetadata().copy(component.getMetadata()); + componentBulkLoader.end(); } finally { try { - if (bTreeTupleSorter != null) { - bTreeTupleSorter.destroy(); + if (rTreeTupleSorter != null) { + rTreeTupleSorter.destroy(); } } finally { - if (abort && componentBulkLoader != null) { - componentBulkLoader.abort(); + if (bTreeTupleSorter != null) { + bTreeTupleSorter.destroy(); } } } + } catch (Throwable e) { + try { + if (componentBulkLoader != null) { + componentBulkLoader.abort(); + } + } catch (Throwable th) { + e.addSuppressed(th); + } + throw e; } return component; } @@ -243,26 +247,35 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree { ILSMDiskComponentBulkLoader componentBulkLoader = component.createBulkLoader(operation, 1.0f, false, 0L, false, false, false, pageWriteCallbackFactory.createPageWriteCallback()); try { - while (cursor.hasNext()) { - cursor.next(); - ITupleReference frameTuple = cursor.getTuple(); - componentBulkLoader.add(frameTuple); + try { + while (cursor.hasNext()) { + cursor.next(); + ITupleReference frameTuple = cursor.getTuple(); + componentBulkLoader.add(frameTuple); + } + } finally { + cursor.close(); } - } finally { - cursor.close(); - } - if (component.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { - filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); - filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); + if (component.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { + filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); + filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); + } + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); + getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, - NoOpOperationCallback.INSTANCE); - getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); - } - componentBulkLoader.end(); + componentBulkLoader.end(); + } catch (Throwable e) { + try { + componentBulkLoader.abort(); + } catch (Throwable th) { + e.addSuppressed(th); + } + throw e; + } return component; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java index 35785d3e48..33ba01ecdc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java @@ -976,17 +976,22 @@ public class RTree extends AbstractTreeIndex { @Override public void end() throws HyracksDataException { - pagesToWrite.clear(); - //if writing a trivial 1-page tree, don't try and propagate up - if (nodeFrontiers.size() > 1) { - propagateBulk(1, true, pagesToWrite); - } + try { + pagesToWrite.clear(); + //if writing a trivial 1-page tree, don't try and propagate up + if (nodeFrontiers.size() > 1) { + propagateBulk(1, true, pagesToWrite); + } - for (ICachedPage c : pagesToWrite) { - write(c); + for (ICachedPage c : pagesToWrite) { + write(c); + } + finish(); + super.end(); + } catch (HyracksDataException e) { + handleException(); + throw e; } - finish(); - super.end(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 4c2395fd12..a4ecb6f490 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -499,11 +499,13 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent, I synchronized (cachedPages) { for (ICachedPageInternal internalPage : cachedPages) { CachedPage c = (CachedPage) internalPage; - if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) { - return false; - } - if (c.valid) { - reachableDpids.add(c.dpid); + if (c != null) { + if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) { + return false; + } + if (c.valid) { + reachableDpids.add(c.dpid); + } } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java index 7d4e119bef..219275e4b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java @@ -15,6 +15,8 @@ package org.apache.hyracks.storage.common.buffercache; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext; import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.LogManager; @@ -38,18 +40,21 @@ public class FIFOLocalWriter implements IFIFOPageWriter { this.context = context; } - @SuppressWarnings("squid:S1181") // System must halt on all IO errors + @SuppressWarnings("squid:S1181") @Override - public void write(ICachedPage page) { + public void write(ICachedPage page) throws HyracksDataException { CachedPage cPage = (CachedPage) page; try { callback.beforeWrite(cPage); bufferCache.write(cPage, context); callback.afterWrite(cPage); + } catch (HyracksDataException e) { + LOGGER.warn("Failed to write page {}", cPage, e); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e); } catch (Throwable th) { // Halt - LOGGER.error("Failed to write page {}", cPage, th); - ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED); + LOGGER.error("FIFOLocalWriter has encountered a fatal error", th); + ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION); } finally { bufferCache.returnPage(cPage); if (DEBUG) { @@ -57,5 +62,4 @@ public class FIFOLocalWriter implements IFIFOPageWriter { } } } - } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java index 31e0e85b1f..2dc4a585f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java @@ -15,7 +15,9 @@ package org.apache.hyracks.storage.common.buffercache; +import org.apache.hyracks.api.exceptions.HyracksDataException; + @FunctionalInterface public interface IFIFOPageWriter { - void write(ICachedPage page); + void write(ICachedPage page) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java index b332a2de14..080b9ea16a 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java @@ -21,22 +21,31 @@ package org.apache.hyracks.util; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class ExponentialRetryPolicy implements IRetryPolicy { + private static final Logger LOGGER = LogManager.getLogger(); + + public static final String CLOUD_UNSTABLE_MODE = "cloud.unstable.mode"; private static final int DEFAULT_MAX_RETRIES = 10; private static final long DEFAULT_INITIAL_DELAY_IN_MILLIS = 100; private static final long DEFAULT_MAX_DELAY_IN_MILLIS = Long.MAX_VALUE - 1; + private static final int UNSTABLE_NUMBER_OF_RETRIES = 100; private final int maxRetries; private final long maxDelay; private int attempt = 0; private long delay; + private boolean printDebugLines = true; /** * Default constructor for ExponentialRetryPolicy. * Initializes with default max retries, initial delay, and max delay. */ public ExponentialRetryPolicy() { - this(DEFAULT_MAX_RETRIES, DEFAULT_INITIAL_DELAY_IN_MILLIS, DEFAULT_MAX_DELAY_IN_MILLIS); + this(isUnstable() ? UNSTABLE_NUMBER_OF_RETRIES : DEFAULT_MAX_RETRIES, DEFAULT_INITIAL_DELAY_IN_MILLIS, + isUnstable() ? 0 : DEFAULT_MAX_DELAY_IN_MILLIS); } /** @@ -71,13 +80,18 @@ public class ExponentialRetryPolicy implements IRetryPolicy { */ public ExponentialRetryPolicy(int maxRetries, long maxDelay) { this(maxRetries, DEFAULT_INITIAL_DELAY_IN_MILLIS, maxDelay); + printDebugLines = false; } @Override public boolean retry(Throwable failure) { if (attempt < maxRetries) { try { - TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1 + delay)); + long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay); + if (printDebugLines) { + LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries); + } + TimeUnit.MILLISECONDS.sleep(sleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -87,4 +101,8 @@ public class ExponentialRetryPolicy implements IRetryPolicy { } return false; } + + private static boolean isUnstable() { + return Boolean.getBoolean(CLOUD_UNSTABLE_MODE); + } }
