This is an automated email from the ASF dual-hosted git repository.
ritik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new af29cbe759 [ASTERIXDB-3564][STO]: Avoid halts on IO operation failures
af29cbe759 is described below
commit af29cbe759a8e9a8b865cad70fac01737f6ceb5b
Author: Savyasach Reddy <[email protected]>
AuthorDate: Thu Jan 16 17:38:06 2025 +0530
[ASTERIXDB-3564][STO]: Avoid halts on IO operation failures
- user model changes: no
- storage format changes: no
- interface changes: yes
details:
- Retry Flush Operations on IOException
- Avoid Halts on Merge failure
- Make Create Index operation retryable
Ext-ref: MB-63040
Change-Id: If253ea03f5baecbab226e527abb4267670a4233e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19187
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ritik Raj <[email protected]>
Tested-by: Ritik Raj <[email protected]>
---
asterixdb/asterix-app/pom.xml | 4 +-
.../org/apache/asterix/app/nc/HaltCallback.java | 11 +-
.../asterix/app/translator/QueryTranslator.java | 6 +-
.../test/cloud_storage/CloudStorageGCSTest.java | 2 +
...STest.java => GCSCloudStorageUnstableTest.java} | 52 +++---
.../cloud_storage/RetryingQueryTranslator.java | 157 ++++++++++++++++
.../UnstableStatementExecutorExtension.java | 55 ++++++
.../UnstableStatementExecutorFactory.java | 46 +++++
.../src/test/resources/cc-cloud-storage-gcs.conf | 2 +
.../asterix/cloud/CloudResettableInputStream.java | 7 +-
.../asterix/cloud/clients/UnstableCloudClient.java | 70 +++++++-
.../cloud/clients/aws/s3/S3BufferedWriter.java | 3 +-
.../cloud/clients/google/gcs/GCSWriter.java | 10 +-
.../column/metadata/schema/ObjectSchemaNode.java | 5 +-
.../operation/lsm/flush/FlushColumnMetadata.java | 7 +-
.../ioopcallbacks/LSMIOOperationCallback.java | 11 ++
.../LSMSecondaryIndexBulkLoadNodePushable.java | 19 +-
.../apache/hyracks/api/exceptions/ErrorCode.java | 1 +
.../src/main/resources/errormsg/en.properties | 1 +
.../storage/am/bloomfilter/impls/BloomFilter.java | 24 ++-
.../storage/am/common/api/IPageManager.java | 6 +
.../IndexBulkLoadOperatorNodePushable.java | 22 ++-
.../AppendOnlyLinkedMetadataPageManager.java | 7 +
.../common/impls/AbstractTreeIndexBulkLoader.java | 1 +
.../storage/am/lsm/btree/impls/LSMBTree.java | 122 +++++++------
.../AbstractLSMWithBloomFilterDiskComponent.java | 9 +-
.../api/AbstractLSMWithBuddyDiskComponent.java | 15 +-
.../am/lsm/common/api/ILSMDiskComponent.java | 5 +
.../am/lsm/common/api/ILSMIOOperationCallback.java | 6 +
.../lsm/common/impls/AbstractLSMDiskComponent.java | 14 +-
.../am/lsm/common/impls/AbstractLSMIndex.java | 7 +-
.../impls/ChainedLSMDiskComponentBulkLoader.java | 17 +-
.../am/lsm/common/impls/EmptyComponent.java | 5 +
.../common/impls/LSMComponentFileReferences.java | 2 +-
.../storage/am/lsm/common/impls/LSMHarness.java | 3 +-
.../impls/LSMIndexDiskComponentBulkLoader.java | 6 +-
.../impls/LSMInvertedComponentFileReferences.java | 43 +++++
.../am/lsm/common/impls/LSMTreeIndexAccessor.java | 14 ++
.../storage/am/lsm/common/util/ComponentUtils.java | 16 ++
.../lsm/invertedindex/impls/LSMInvertedIndex.java | 188 ++++++++++++--------
.../impls/LSMInvertedIndexAccessor.java | 14 ++
.../impls/LSMInvertedIndexDiskComponent.java | 27 +--
.../impls/LSMInvertedIndexFileManager.java | 11 +-
.../impls/LSMInvertedIndexFlushOperation.java | 8 +-
.../impls/LSMInvertedIndexMergeOperation.java | 8 +-
.../invertedindex/ondisk/OnDiskInvertedIndex.java | 3 +
.../storage/am/lsm/rtree/impls/LSMRTree.java | 112 ++++++------
.../rtree/impls/LSMRTreeWithAntiMatterTuples.java | 197 +++++++++++----------
.../hyracks/storage/am/rtree/impls/RTree.java | 23 ++-
.../storage/common/buffercache/BufferCache.java | 12 +-
.../common/buffercache/FIFOLocalWriter.java | 14 +-
.../common/buffercache/IFIFOPageWriter.java | 4 +-
.../hyracks/util/ExponentialRetryPolicy.java | 22 ++-
53 files changed, 1055 insertions(+), 401 deletions(-)
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 35344ab195..ab938d8384 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -486,7 +486,7 @@
**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
-
**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,
+
**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,**/GCSCloudStorageUnstableTest,
**/AzureBlobStorageExternalDatasetOnePartitionTest.java,**/SqlppSinglePointLookupExecutionTest.java,
**/Atomic*.java, **/AwsS3*.java, **/*SqlppHdfs*.java,
**/*RQGTest.java, **/*RQJTest.java
</test.excludes>
@@ -553,7 +553,7 @@
<id>asterix-gerrit-cloud-nons3-tests</id>
<properties>
<test.includes>
- **/CloudStorageGCSTest.java,
**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,
+
**/GCSCloudStorageUnstableTest.java,**/CloudStorageAzTest.java,**/AzureBlobStorageExternalDatasetTest.java,
**/AzureBlobStorageExternalDatasetOnePartitionTest.java,**/CloudStorageUnstableTest.java,
**/*SqlppHdfs*.java
</test.includes>
<failIfNoTests>false</failIfNoTests>
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 466b617783..f66f097b4f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -47,12 +47,9 @@ public class HaltCallback implements
IIoOperationFailedCallback {
}
private boolean haltOnFailure(ILSMIOOperation operation) {
- switch (operation.getIOOperationType()) {
- case CLEANUP:
- case REPLICATE:
- return false;
- default:
- return true;
- }
+ return switch (operation.getIOOperationType()) {
+ case CLEANUP, REPLICATE, MERGE -> false;
+ default -> true;
+ };
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f7a6dc12af..fde71f8c71 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -481,7 +481,7 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
if (stats.getProfileType() == Stats.ProfileType.FULL) {
this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
}
- handleLoadStatement(metadataProvider, stmt, hcc);
+ handleLoadStatement(metadataProvider, stmt, hcc,
requestParameters);
break;
case COPY_FROM:
if (stats.getProfileType() == Stats.ProfileType.FULL) {
@@ -4025,8 +4025,8 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
- protected void handleLoadStatement(MetadataProvider metadataProvider,
Statement stmt, IHyracksClientConnection hcc)
- throws Exception {
+ protected void handleLoadStatement(MetadataProvider metadataProvider,
Statement stmt, IHyracksClientConnection hcc,
+ IRequestParameters requestParameters) throws Exception {
LoadStatement loadStmt = (LoadStatement) stmt;
String datasetName = loadStmt.getDatasetName();
metadataProvider.validateDatabaseObjectName(loadStmt.getNamespace(),
datasetName, loadStmt.getSourceLocation());
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
index 65b5adf5e6..8c071f08a1 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -40,6 +40,7 @@ import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
@@ -57,6 +58,7 @@ import com.google.cloud.storage.StorageOptions;
* Run tests in cloud deployment environment
*/
@RunWith(Parameterized.class)
+@Ignore
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class CloudStorageGCSTest {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java
similarity index 73%
copy from
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
copy to
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java
index 65b5adf5e6..c97e459fe3 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/GCSCloudStorageUnstableTest.java
@@ -16,27 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.asterix.test.cloud_storage;
import static
org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET;
import static org.apache.asterix.api.common.LocalCloudUtil.MOCK_SERVER_REGION;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Objects;
-import java.util.Random;
import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.test.runtime.LangExecutionUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.asterix.testframework.xml.Description;
import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
@@ -44,7 +51,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Blob;
@@ -58,7 +64,7 @@ import com.google.cloud.storage.StorageOptions;
*/
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class CloudStorageGCSTest {
+public class GCSCloudStorageUnstableTest {
private static final Logger LOGGER = LogManager.getLogger();
@@ -71,13 +77,14 @@ public class CloudStorageGCSTest {
public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:24443";
private static final String MOCK_SERVER_PROJECT_ID =
"asterixdb-gcs-test-project-id";
- public CloudStorageGCSTest(TestCaseContext tcCtx) {
+ public GCSCloudStorageUnstableTest(TestCaseContext tcCtx) {
this.tcCtx = tcCtx;
}
@BeforeClass
public static void setUp() throws Exception {
LocalCloudUtilAdobeMock.startS3CloudEnvironment(true, true);
+ System.setProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE,
"true");
Storage storage =
StorageOptions.newBuilder().setHost(MOCK_SERVER_HOSTNAME)
.setCredentials(NoCredentials.getInstance()).setProjectId(MOCK_SERVER_PROJECT_ID).build().getService();
cleanup(storage);
@@ -92,27 +99,13 @@ public class CloudStorageGCSTest {
@AfterClass
public static void tearDown() throws Exception {
+ System.clearProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
LangExecutionUtil.tearDown();
- LocalCloudUtilAdobeMock.shutdownSilently();
}
- @Parameters(name = "CloudStorageGCSTest {index}: {0}")
+ @Parameterized.Parameters(name = "GCSCloudStorageUnstableTest {index}:
{0}")
public static Collection<Object[]> tests() throws Exception {
- long seed = System.nanoTime();
- Random random = new Random(seed);
- LOGGER.info("CloudStorageGCSTest seed {}", seed);
- Collection<Object[]> tests = LangExecutionUtil.tests(ONLY_TESTS,
SUITE_TESTS);
- List<Object[]> selected = new ArrayList<>();
- for (Object[] test : tests) {
- if (!Objects.equals(((TestCaseContext)
test[0]).getTestGroups()[0].getName(), "sqlpp_queries")) {
- selected.add(test);
- }
- // Select 10% of the tests randomly
- else if (random.nextInt(10) == 0) {
- selected.add(test);
- }
- }
- return selected;
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
}
@Test
@@ -120,6 +113,19 @@ public class CloudStorageGCSTest {
List<TestCase.CompilationUnit> cu =
tcCtx.getTestCase().getCompilationUnit();
Assume.assumeTrue(cu.size() > 1 ||
!EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
LangExecutionUtil.test(tcCtx);
+ for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs)
{
+ IDatasetLifecycleManager lifecycleManager =
+ ((INcApplicationContext)
nc.getApplicationContext()).getDatasetLifecycleManager();
+ StorageIOStats stats = lifecycleManager.getDatasetsIOStats();
+ while (stats.getPendingFlushes() != 0 || stats.getPendingMerges()
!= 0) {
+ stats = lifecycleManager.getDatasetsIOStats();
+ }
+ }
+ IBufferCache bufferCache;
+ for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs)
{
+ bufferCache = ((INcApplicationContext)
nc.getApplicationContext()).getBufferCache();
+ Assert.assertTrue(((BufferCache) bufferCache).isClean());
+ }
}
private static String getText(Description description) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java
new file mode 100644
index 0000000000..af580b26e8
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/RetryingQueryTranslator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.cloud.clients.UnstableCloudClient.ERROR_RATE;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.api.IResponsePrinter;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.LoadStatement;
+import org.apache.asterix.lang.common.statement.TruncateDatasetStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.utils.Creator;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class RetryingQueryTranslator extends QueryTranslator {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public RetryingQueryTranslator(ICcApplicationContext appCtx,
List<Statement> statements, SessionOutput output,
+ ILangCompilationProvider compilationProvider, ExecutorService
executorService,
+ IResponsePrinter responsePrinter) {
+ super(appCtx, statements, output, compilationProvider,
executorService, responsePrinter);
+ }
+
+ @Override
+ public void handleCreateIndexStatement(MetadataProvider metadataProvider,
Statement stmt,
+ IHyracksClientConnection hcc, IRequestParameters
requestParameters, Creator creator) throws Exception {
+ int times = 100;
+ Exception ex = null;
+ double initialErrorRate = ERROR_RATE.get();
+ try {
+ while (times-- > 0) {
+ try {
+ super.handleCreateIndexStatement(metadataProvider, stmt,
hcc, requestParameters, creator);
+ ex = null;
+ break;
+ } catch (Exception e) {
+ ERROR_RATE.set(Double.max(ERROR_RATE.get() - 0.01d,
0.01d));
+ if (retryOnFailure(e)) {
+ LOGGER.error("Attempt: {}, Failed to create index",
100 - times, e);
+ metadataProvider.getLocks().reset();
+ ex = e;
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ } finally {
+ ERROR_RATE.set(initialErrorRate);
+ }
+ }
+
+ @Override
+ public void handleAnalyzeStatement(MetadataProvider metadataProvider,
Statement stmt, IHyracksClientConnection hcc,
+ IRequestParameters requestParameters) throws Exception {
+ int times = 100;
+ Exception ex = null;
+ while (times-- > 0) {
+ try {
+ super.handleAnalyzeStatement(metadataProvider, stmt, hcc,
requestParameters);
+ ex = null;
+ break;
+ } catch (Exception e) {
+ if (retryOnFailure(e)) {
+ LOGGER.error("Attempt: {}, Failed to create index", 100 -
times, e);
+ metadataProvider.getLocks().reset();
+ ex = e;
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public void handleLoadStatement(MetadataProvider metadataProvider,
Statement stmt, IHyracksClientConnection hcc,
+ IRequestParameters requestParameters) throws Exception {
+ int times = 100;
+ Exception ex = null;
+ double initialErrorRate = ERROR_RATE.get();
+ try {
+ while (times-- > 0) {
+ try {
+ super.handleLoadStatement(metadataProvider, stmt, hcc,
requestParameters);
+ ex = null;
+ break;
+ } catch (Exception e) {
+ ERROR_RATE.set(Double.max(ERROR_RATE.get() - 0.01d,
0.01d));
+ if (retryOnFailure(e)) {
+ LOGGER.error("Attempt: {}, Failed to load", 100 -
times, e);
+ metadataProvider.getLocks().reset();
+ ex = e;
+ LoadStatement loadStmt = (LoadStatement) stmt;
+ TruncateDatasetStatement truncateDatasetStatement =
new TruncateDatasetStatement(
+ loadStmt.getNamespace(), new
Identifier(loadStmt.getDatasetName()), true);
+ super.handleDatasetTruncateStatement(metadataProvider,
truncateDatasetStatement,
+ requestParameters);
+ metadataProvider.getLocks().reset();
+
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ } finally {
+ ERROR_RATE.set(initialErrorRate);
+ }
+ }
+
+ private boolean retryOnFailure(Exception e) {
+ if (e instanceof HyracksDataException) {
+ return ((HyracksDataException) e).getErrorCode() ==
ErrorCode.FAILED_IO_OPERATION.intValue()
+ &&
ExceptionUtils.getRootCause(e).getMessage().contains("Simulated error");
+ }
+ return false;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java
new file mode 100644
index 0000000000..bf9aecbb83
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorExtension.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.asterix.app.cc.IStatementExecutorExtension;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.IServiceContext;
+
+public class UnstableStatementExecutorExtension implements
IStatementExecutorExtension {
+
+ public static final ExtensionId RETRYING_QUERY_TRANSLATOR_EXTENSION_ID =
+ new
ExtensionId(UnstableStatementExecutorExtension.class.getSimpleName(), 0);
+
+ @Override
+ public ExtensionId getId() {
+ return RETRYING_QUERY_TRANSLATOR_EXTENSION_ID;
+ }
+
+ @Override
+ public void configure(List<Pair<String, String>> args, IServiceContext
serviceCtx) {
+
+ }
+
+ @Override
+ public IStatementExecutorFactory getQueryTranslatorFactory() {
+ return null;
+ }
+
+ @Override
+ public IStatementExecutorFactory
getStatementExecutorFactory(ExecutorService executorService) {
+ return new UnstableStatementExecutorFactory(executorService);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java
new file mode 100644
index 0000000000..f65276bb2d
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/UnstableStatementExecutorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.common.api.IResponsePrinter;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.SessionOutput;
+
+public class UnstableStatementExecutorFactory extends
DefaultStatementExecutorFactory {
+ public UnstableStatementExecutorFactory(ExecutorService executorService) {
+ super(executorService);
+ }
+
+ @Override
+ public QueryTranslator create(ICcApplicationContext appCtx,
List<Statement> statements, SessionOutput output,
+ ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider,
+ IResponsePrinter responsePrinter) {
+ return new RetryingQueryTranslator(appCtx, statements, output,
compilationProvider, executorService,
+ responsePrinter);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
index 9b6f5477cb..376252e690 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
@@ -73,3 +73,5 @@ cloud.storage.anonymous.auth=true
cloud.storage.cache.policy=selective
cloud.max.write.requests.per.second=1000
cloud.max.read.requests.per.second=5000
+
+[extension/org.apache.asterix.test.cloud_storage.UnstableStatementExecutorExtension]
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index a233ca5afb..9b832a2018 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
import org.apache.hyracks.cloud.io.request.ICloudRequest;
@@ -158,7 +159,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
} catch (Exception e) {
LOGGER.error(e);
- throw HyracksDataException.create(e);
+ throw
HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION, e);
}
}
bufferedWriter.finish();
@@ -190,7 +191,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
try {
close();
} catch (IOException e) {
- throw HyracksDataException.create(e);
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
e);
}
}
@@ -203,7 +204,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request,
retry);
} catch (Exception e) {
LOGGER.error(e);
- throw HyracksDataException.create(e);
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
e);
}
writeBuffer.clear();
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index 28fa53e5f3..1b39251334 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -25,11 +25,15 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.cloud.clients.google.gcs.GCSWriter;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.control.nc.io.IOManager;
@@ -39,7 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class UnstableCloudClient implements ICloudClient {
// 10% error rate
- private static final double ERROR_RATE = 0.1d;
+ public static final AtomicReference<Double> ERROR_RATE = new
AtomicReference<>(0.11d);
private static final Random RANDOM = new Random(0);
private final ICloudClient cloudClient;
@@ -61,6 +65,9 @@ public class UnstableCloudClient implements ICloudClient {
public ICloudWriter createWriter(String bucket, String path,
IWriteBufferProvider bufferProvider) {
if (cloudClient instanceof S3CloudClient) {
return createUnstableWriter((S3CloudClient) cloudClient, bucket,
path, bufferProvider);
+ } else if (cloudClient instanceof GCSCloudClient) {
+ return new UnstableGCSCloudWriter(cloudClient.createWriter(bucket,
path, bufferProvider),
+ cloudClient.getWriteBufferSize());
}
return cloudClient.createWriter(bucket, path, bufferProvider);
}
@@ -138,8 +145,8 @@ public class UnstableCloudClient implements ICloudClient {
private static void fail() throws HyracksDataException {
double prob = RANDOM.nextInt(100) / 100.0d;
- if (prob <= ERROR_RATE) {
- throw HyracksDataException.create(new IOException("Simulated
error"));
+ if (prob < ERROR_RATE.get()) {
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
new IOException("Simulated error"));
}
}
@@ -150,6 +157,63 @@ public class UnstableCloudClient implements ICloudClient {
return new CloudResettableInputStream(bufferedWriter, bufferProvider);
}
+ /**
+ * An unstable cloud writer that mimics the functionality of {@link
GCSWriter}
+ */
+ private static class UnstableGCSCloudWriter implements ICloudWriter {
+ private final ICloudWriter writer;
+ private final int writeBufferSize;
+
+ UnstableGCSCloudWriter(ICloudWriter writer, int writeBufferSize) {
+ this.writer = writer;
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ @Override
+ public int write(ByteBuffer header, ByteBuffer page) throws
HyracksDataException {
+ return write(header) + write(page);
+ }
+
+ @Override
+ public int write(ByteBuffer page) throws HyracksDataException {
+ if (position() == 0) {
+ fail();
+ }
+ long uploadsToBeTriggered =
+ ((position() + page.remaining()) / writeBufferSize) -
(position() / writeBufferSize);
+ while (uploadsToBeTriggered-- > 0) {
+ fail();
+ }
+ return writer.write(page);
+ }
+
+ @Override
+ public void write(int b) throws HyracksDataException {
+ write(ByteBuffer.wrap(new byte[] { (byte) b }));
+ }
+
+ @Override
+ public int write(byte[] b, int off, int len) throws
HyracksDataException {
+ return write(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
+ public long position() {
+ return writer.position();
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ fail();
+ writer.finish();
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ writer.abort();
+ }
+ }
+
private static class UnstableCloudBufferedWriter implements
ICloudBufferedWriter {
private final ICloudBufferedWriter bufferedWriter;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 53d6546919..7bd7ad5bbf 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.logging.log4j.LogManager;
@@ -131,7 +132,7 @@ public class S3BufferedWriter implements
ICloudBufferedWriter {
try {
s3Client.completeMultipartUpload(request);
} catch (Exception e) {
- throw HyracksDataException.create(e);
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
e);
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd549a..3a837867c6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,15 +72,15 @@ public class GCSWriter implements ICloudWriter {
while (uploadsToBeTriggered-- > 0) {
profiler.objectMultipartUpload();
}
- setUploadId();
int written = 0;
try {
+ setUploadId();
while (page.hasRemaining()) {
written += writer.write(page);
}
} catch (IOException | RuntimeException e) {
- throw HyracksDataException.create(e);
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
e);
}
writtenBytes += written;
@@ -104,13 +105,14 @@ public class GCSWriter implements ICloudWriter {
@Override
public void finish() throws HyracksDataException {
guardian.checkWriteAccess(bucket, path);
- setUploadId();
profiler.objectMultipartUpload();
try {
+ setUploadId();
+
writer.close();
writer = null;
} catch (IOException | RuntimeException e) {
- throw HyracksDataException.create(e);
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
e);
}
log("FINISHED");
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
index 1f74fb3c0a..4cf2d23e40 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -164,7 +164,10 @@ public final class ObjectSchemaNode extends
AbstractSchemaNestedNode {
public void abort(DataInputStream input, Map<AbstractSchemaNestedNode,
RunLengthIntArray> definitionLevels)
throws IOException {
- definitionLevels.put(this, new RunLengthIntArray());
+ input.readByte(); // Skip the type tag, see ObjectSchemaNode#serialize
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
int numberOfChildren = input.readInt();
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index 5aca3a4577..9c58247ca0 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -271,7 +271,8 @@ public class FlushColumnMetadata extends
AbstractColumnMetadata {
@Override
public void abort() throws HyracksDataException {
- DataInputStream input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray()));
+ DataInputStream input = new DataInputStream(new
ByteArrayInputStream(serializedMetadata.getByteArray(),
+ serializedMetadata.getStartOffset(),
serializedMetadata.getLength()));
try {
abort(input);
} catch (IOException e) {
@@ -280,6 +281,7 @@ public class FlushColumnMetadata extends
AbstractColumnMetadata {
}
private void abort(DataInputStream input) throws IOException {
+ input.skipBytes(OFFSETS_SIZE);
level = -1;
repeated = 0;
changed = false;
@@ -290,6 +292,9 @@ public class FlushColumnMetadata extends
AbstractColumnMetadata {
fieldNamesDictionary.abort(input);
definitionLevels.clear();
root.abort(input, definitionLevels);
+ if (metaRoot != null) {
+ metaRoot.abort(input, definitionLevels);
+ }
}
public static void deserializeWriters(DataInput input,
List<IColumnValuesWriter> writers,
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 87aa3bd3c0..52af32909a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -141,6 +141,17 @@ public class LSMIOOperationCallback implements
ILSMIOOperationCallback {
}
}
+ @Override
+ public void afterFailure(ILSMIOOperation operation) {
+ if (isMerge(operation)) {
+ try {
+ ioManager.delete(getOperationMaskFilePath(operation));
+ } catch (HyracksDataException e) {
+ operation.getFailure().addSuppressed(e);
+ }
+ }
+ }
+
protected void addComponentToCheckpoint(ILSMIOOperation operation) throws
HyracksDataException {
// will always update the checkpoint file even if no new component was
created
FileReference target = operation.getTarget();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 21c3b27c70..17c5445514 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -57,6 +57,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends
AbstractLSMSecondaryI
private LSMIndexDiskComponentBulkLoader componentBulkLoader;
private int currentComponentPos = -1;
+ private boolean failed = false;
public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int
partition, RecordDescriptor inputRecDesc,
IIndexDataflowHelperFactory primaryIndexHelperFactory,
@@ -98,7 +99,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends
AbstractLSMSecondaryI
public void close() throws HyracksDataException {
HyracksDataException closeException = null;
try {
- endCurrentComponent();
+ endOrAbortCurrentComponent();
} catch (HyracksDataException e) {
closeException = e;
}
@@ -151,6 +152,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends
AbstractLSMSecondaryI
@Override
public void fail() throws HyracksDataException {
writer.fail();
+ failed = true;
}
@Override
@@ -177,15 +179,22 @@ public class LSMSecondaryIndexBulkLoadNodePushable
extends AbstractLSMSecondaryI
}
}
- private void endCurrentComponent() throws HyracksDataException {
+ private void endOrAbortCurrentComponent() throws HyracksDataException {
if (componentBulkLoader != null) {
- componentBulkLoader.end();
- componentBulkLoader = null;
+ try {
+ if (!failed) {
+ componentBulkLoader.end();
+ } else {
+ componentBulkLoader.abort();
+ }
+ } finally {
+ componentBulkLoader = null;
+ }
}
}
private void loadNewComponent(int componentPos) throws
HyracksDataException {
- endCurrentComponent();
+ endOrAbortCurrentComponent(); // This should never call
componentBulkLoader.abort()
int numTuples = getNumDeletedTuples(componentPos);
ILSMDiskComponent primaryComponent =
primaryIndex.getDiskComponents().get(componentPos);
Map<String, Object> parameters = new HashMap<>();
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7cb107dfc5..d7abf4c057 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -162,6 +162,7 @@ public enum ErrorCode implements IError {
EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132),
EMPTY_TYPE_INFERRED(133),
SCHEMA_LIMIT_EXCEEDED(134),
+ FAILED_IO_OPERATION(135),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index e1fbe3020c..e9569be66f 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -152,6 +152,7 @@
132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the
schema
133 = Schema could not be inferred, empty types found in the result
134 = Schema Limit exceeded, maximum number of heterogeneous schemas allowed :
'%1$s'
+135 = An IO Operation has failed
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index aa174741a6..abb03ebbff 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -386,13 +386,15 @@ public class BloomFilter {
@Override
public void end() throws HyracksDataException {
- allocateAndInitMetaDataPage();
- pageWriter.write(metaDataPage);
- for (ICachedPage p : pages) {
- pageWriter.write(p);
- }
- if (hasFailed()) {
- throw HyracksDataException.create(getFailure());
+ try {
+ allocateAndInitMetaDataPage();
+ pageWriter.write(metaDataPage);
+ for (ICachedPage p : pages) {
+ pageWriter.write(p);
+ }
+ } catch (HyracksDataException e) {
+ handleException();
+ throw e;
}
BloomFilter.this.numBits = numBits;
BloomFilter.this.numHashes = numHashes;
@@ -401,8 +403,7 @@ public class BloomFilter {
BloomFilter.this.version = BLOCKED_BLOOM_FILTER_VERSION;
}
- @Override
- public void abort() throws HyracksDataException {
+ private void handleException() {
for (ICachedPage p : pages) {
if (p != null) {
bufferCache.returnPage(p, false);
@@ -413,6 +414,11 @@ public class BloomFilter {
}
}
+ @Override
+ public void abort() throws HyracksDataException {
+ handleException();
+ }
+
@Override
public void force() throws HyracksDataException {
bufferCache.force(fileId, false);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
index 208d3b701e..ff28464a93 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
@@ -48,6 +48,12 @@ public interface IPageManager {
*/
void close(IPageWriteFailureCallback failureCallback) throws
HyracksDataException;
+ /**
+ * Return all the pages from the page manager
+ */
+ default void returnAllPages() {
+ }
+
/**
* Create a metadata frame to be used for reading and writing to metadata
pages
*
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 0206f201d0..d606a5c649 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -61,6 +61,7 @@ public class IndexBulkLoadOperatorNodePushable extends
AbstractUnaryInputUnaryOu
protected final IIndexBulkLoader[] bulkLoaders;
protected ITupleFilter tupleFilter;
protected FrameTupleReference frameTuple;
+ private boolean failed = false;
public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory
indexHelperFactory, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, float fillFactor, boolean
verifyInput, long numElementsHint,
@@ -151,6 +152,7 @@ public class IndexBulkLoadOperatorNodePushable extends
AbstractUnaryInputUnaryOu
@Override
public void fail() throws HyracksDataException {
writer.fail();
+ failed = true;
}
protected void initializeBulkLoader(IIndex index, int indexId) throws
HyracksDataException {
@@ -159,12 +161,28 @@ public class IndexBulkLoadOperatorNodePushable extends
AbstractUnaryInputUnaryOu
}
private void closeBulkLoaders() throws HyracksDataException {
+ HyracksDataException failure = null;
for (IIndexBulkLoader bulkLoader : bulkLoaders) {
// bulkloader can be null if an exception is thrown before it is
initialized.
- if (bulkLoader != null) {
- bulkLoader.end();
+ try {
+ if (bulkLoader != null) {
+ if (failure == null && !failed) {
+ bulkLoader.end();
+ } else {
+ bulkLoader.abort();
+ }
+ }
+ } catch (HyracksDataException e) {
+ if (failure == null) {
+ failure = e;
+ } else {
+ failure.addSuppressed(e);
+ }
}
}
+ if (failure != null) {
+ throw failure;
+ }
}
protected static void closeIndexes(IIndex[] indexes,
IIndexDataflowHelper[] indexHelpers,
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index dae01bfe88..9c51d92202 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -152,6 +152,13 @@ public class AppendOnlyLinkedMetadataPageManager
implements IMetadataPageManager
metadataPages.clear();
}
+ @Override
+ public void returnAllPages() {
+ for (ICachedPage page : metadataPages) {
+ bufferCache.returnPage(page, false);
+ }
+ }
+
/**
* For storage on append-only media (such as HDFS), the meta data page has
to be written last.
* However, some implementations still write the meta data to the front.
To deal with this as well
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
index d5f836c51b..5ef3aab8b2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
@@ -130,6 +130,7 @@ public abstract class AbstractTreeIndexBulkLoader extends
PageWriteFailureCallba
}
}
releasedLatches = true;
+ freePageManager.returnAllPages();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 05e78dd44d..3526d2f144 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -238,73 +238,83 @@ public class LSMBTree extends AbstractLSMIndex implements
ITreeIndex {
LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent)
flushOp.getFlushingComponent();
IIndexAccessor accessor =
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- ILSMDiskComponent component;
- ILSMDiskComponentBulkLoader componentBulkLoader;
+ ILSMDiskComponent component = null;
+ ILSMDiskComponentBulkLoader componentBulkLoader = null;
try {
- RangePredicate nullPred = new RangePredicate(null, null, true,
true, null, null);
- long numElements = 0L;
- if (hasBloomFilter) {
- //count elements in btree for creating Bloomfilter
- IIndexCursor countingCursor = ((BTreeAccessor)
accessor).createCountingSearchCursor();
- accessor.search(countingCursor, nullPred);
+ try {
+ RangePredicate nullPred = new RangePredicate(null, null, true,
true, null, null);
+ long numElements = 0L;
+ if (hasBloomFilter) {
+ //count elements in btree for creating Bloomfilter
+ IIndexCursor countingCursor = ((BTreeAccessor)
accessor).createCountingSearchCursor();
+ accessor.search(countingCursor, nullPred);
+ try {
+ while (countingCursor.hasNext()) {
+ countingCursor.next();
+ ITupleReference countTuple =
countingCursor.getTuple();
+ numElements =
IntegerPointable.getInteger(countTuple.getFieldData(0),
+ countTuple.getFieldStart(0));
+ }
+ } finally {
+ try {
+ countingCursor.close();
+ } finally {
+ countingCursor.destroy();
+ }
+ }
+ }
+ component = createDiskComponent(componentFactory,
flushOp.getTarget(), null,
+ flushOp.getBloomFilterTarget(), true);
+ componentBulkLoader = component.createBulkLoader(operation,
1.0f, false, numElements, false, false,
+ false,
pageWriteCallbackFactory.createPageWriteCallback());
+ IIndexCursor scanCursor = accessor.createSearchCursor(false);
+ accessor.search(scanCursor, nullPred);
try {
- while (countingCursor.hasNext()) {
- countingCursor.next();
- ITupleReference countTuple = countingCursor.getTuple();
- numElements =
-
IntegerPointable.getInteger(countTuple.getFieldData(0),
countTuple.getFieldStart(0));
+ while (scanCursor.hasNext()) {
+ scanCursor.next();
+ // we can safely throw away updated tuples in
secondary BTree components, because they correspond to
+ // deleted tuples
+ if (updateAware && ((LSMBTreeTupleReference)
scanCursor.getTuple()).isUpdated()) {
+ continue;
+ }
+ componentBulkLoader.add(scanCursor.getTuple());
}
} finally {
try {
- countingCursor.close();
+ scanCursor.close();
} finally {
- countingCursor.destroy();
+ scanCursor.destroy();
}
}
+ } finally {
+ accessor.destroy();
}
- component = createDiskComponent(componentFactory,
flushOp.getTarget(), null, flushOp.getBloomFilterTarget(),
- true);
- componentBulkLoader = component.createBulkLoader(operation, 1.0f,
false, numElements, false, false, false,
- pageWriteCallbackFactory.createPageWriteCallback());
- IIndexCursor scanCursor = accessor.createSearchCursor(false);
- accessor.search(scanCursor, nullPred);
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
+
getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
+ }
+ // Write metadata from memory component to disk
+ // Q. what about the merge operation? how do we resolve conflicts
+ // A. Through providing an appropriate ILSMIOOperationCallback
+ // Must not reset the metadata before the flush is completed
+ // Use the copy of the metadata in the opContext
+ // TODO This code should be in the callback and not in the index
+ flushingComponent.getMetadata().copy(component.getMetadata());
+ componentBulkLoader.end();
+ } catch (Throwable e) {
try {
- while (scanCursor.hasNext()) {
- scanCursor.next();
- // we can safely throw away updated tuples in secondary
BTree components, because they correspond to
- // deleted tuples
- if (updateAware && ((LSMBTreeTupleReference)
scanCursor.getTuple()).isUpdated()) {
- continue;
- }
- componentBulkLoader.add(scanCursor.getTuple());
- }
- } finally {
- try {
- scanCursor.close();
- } finally {
- scanCursor.destroy();
+ if (componentBulkLoader != null) {
+ componentBulkLoader.abort();
}
+ } catch (Throwable th) {
+ e.addSuppressed(th);
}
- } finally {
- accessor.destroy();
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- getFilterManager().updateFilter(component.getLSMComponentFilter(),
filterTuples,
- NoOpOperationCallback.INSTANCE);
- getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
+ throw e;
}
- // Write metadata from memory component to disk
- // Q. what about the merge operation? how do we resolve conflicts
- // A. Through providing an appropriate ILSMIOOperationCallback
- // Must not reset the metadata before the flush is completed
- // Use the copy of the metadata in the opContext
- // TODO This code should be in the callback and not in the index
- flushingComponent.getMetadata().copy(component.getMetadata());
-
- componentBulkLoader.end();
return component;
}
@@ -313,7 +323,7 @@ public class LSMBTree extends AbstractLSMIndex implements
ITreeIndex {
public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws
HyracksDataException {
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
IIndexCursor cursor = mergeOp.getCursor();
- ILSMDiskComponent mergedComponent;
+ ILSMDiskComponent mergedComponent = null;
ILSMDiskComponentBulkLoader componentBulkLoader = null;
try {
try {
@@ -349,6 +359,7 @@ public class LSMBTree extends AbstractLSMIndex implements
ITreeIndex {
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
mergedComponent.getMetadataHolder());
}
+ componentBulkLoader.end();
} catch (Throwable e) { // NOSONAR.. As per the contract, we should
either abort or end
try {
if (componentBulkLoader != null) {
@@ -359,7 +370,6 @@ public class LSMBTree extends AbstractLSMIndex implements
ITreeIndex {
}
throw e;
}
- componentBulkLoader.end();
return mergedComponent;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index db36c72a99..c77d7ff884 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -47,8 +47,13 @@ public abstract class
AbstractLSMWithBloomFilterDiskComponent extends AbstractLS
public void markAsValid(boolean persist, IPageWriteFailureCallback
callback) throws HyracksDataException {
// The order of forcing the dirty page to be flushed is critical. The
// bloom filter must be always done first.
- ComponentUtils.markAsValid(getBloomFilterBufferCache(),
getBloomFilter(), persist);
- super.markAsValid(persist, callback);
+ try {
+ ComponentUtils.markAsValid(getBloomFilterBufferCache(),
getBloomFilter(), persist);
+ super.markAsValid(persist, callback);
+ } catch (HyracksDataException ex) {
+ returnPages();
+ throw ex;
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
index fda48f04c3..158a3b8871 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
@@ -40,8 +40,19 @@ public abstract class AbstractLSMWithBuddyDiskComponent
extends AbstractLSMWithB
@Override
public void markAsValid(boolean persist, IPageWriteFailureCallback
callback) throws HyracksDataException {
- super.markAsValid(persist, callback);
- ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+ try {
+ super.markAsValid(persist, callback);
+ ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+ } catch (HyracksDataException ex) {
+ returnPages();
+ throw ex;
+ }
+ }
+
+ @Override
+ public void returnPages() {
+ getBuddyIndex().getPageManager().returnAllPages();
+ super.returnPages();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 029a59de63..e41a17efa7 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -137,4 +137,9 @@ public interface ILSMDiskComponent extends ILSMComponent {
ILSMDiskComponentBulkLoader createBulkLoader(ILSMIOOperation operation,
float fillFactor, boolean verifyInput,
long numElementsHint, boolean checkIfEmptyIndex, boolean
withFilter, boolean cleanupEmptyComponent,
IPageWriteCallback callback) throws HyracksDataException;
+
+ /**
+ * Returns all pages of the component to the buffer cache
+ */
+ void returnPages();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index a778a4c1ee..542cf47b6a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -58,6 +58,12 @@ public interface ILSMIOOperationCallback {
*/
void afterFinalize(ILSMIOOperation operation) throws HyracksDataException;
+ /**
+ * This method is called on an IO operation after the operation fails
+ */
+ default void afterFailure(ILSMIOOperation operation) {
+ }
+
/**
* This method is called after the schduler is done with the IO operation
* For operation that are not scheduled, this call is skipped
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index baf0931319..c7240f34b3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -158,8 +158,18 @@ public abstract class AbstractLSMDiskComponent extends
AbstractLSMComponent impl
*/
@Override
public void markAsValid(boolean persist, IPageWriteFailureCallback
callback) throws HyracksDataException {
- ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
- LOGGER.debug("marked {} as valid component with id {}", getIndex(),
getId());
+ try {
+ ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
+ LOGGER.debug("marked {} as valid component with id {}",
getIndex(), getId());
+ } catch (Exception e) {
+ returnPages();
+ throw e;
+ }
+ }
+
+ @Override
+ public void returnPages() {
+ ComponentUtils.returnPages(getMetadataHolder());
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 4f7c624996..3f60978c83 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -583,7 +583,12 @@ public abstract class AbstractLSMIndex implements
ILSMIndex {
throws HyracksDataException {
ILSMDiskComponent component = factory.createComponent(this,
new LSMComponentFileReferences(insertFileReference,
deleteIndexFileReference, bloomFilterFileRef));
- component.activate(createComponent);
+ try {
+ component.activate(createComponent);
+ } catch (HyracksDataException e) {
+ component.returnPages();
+ throw e;
+ }
return component;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index 0214712c27..f90da79d53 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -127,9 +127,20 @@ public class ChainedLSMDiskComponentBulkLoader implements
ILSMDiskComponentBulkL
@Override
public void abort() throws HyracksDataException {
operation.setStatus(LSMIOOperationStatus.FAILURE);
- final int bulkloadersCount = bulkloaderChain.size();
- for (int i = 0; i < bulkloadersCount; i++) {
- bulkloaderChain.get(i).abort();
+ HyracksDataException failure = null;
+ for (IChainedComponentBulkLoader componentBulkLoader :
bulkloaderChain) {
+ try {
+ componentBulkLoader.abort();
+ } catch (HyracksDataException e) {
+ if (failure == null) {
+ failure = e;
+ } else {
+ failure.addSuppressed(e);
+ }
+ }
+ }
+ if (failure != null) {
+ throw failure;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 9a112ee4d5..0dfb12ef27 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -157,4 +157,9 @@ public class EmptyComponent implements ILSMDiskComponent {
public String toString() {
return "EmptyComponent";
}
+
+ @Override
+ public void returnPages() {
+ // Do nothing
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
index 9346b56a80..392872477c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
import org.apache.hyracks.api.io.FileReference;
-public final class LSMComponentFileReferences {
+public class LSMComponentFileReferences {
// The FileReference for the index that is used for inserting records of
the component. For instance, this will be the FileReference of the RTree in one
component of the LSM-RTree.
private final FileReference insertIndexFileReference;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index d019a088a1..69c6ea63b2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -266,7 +266,7 @@ public class LSMHarness implements ILSMHarness {
if (inactiveMemoryComponentsToBeCleanedUp != null) {
cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp);
}
- if (opType == LSMOperationType.FLUSH) {
+ if (opType == LSMOperationType.FLUSH && !failedOperation) {
ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent)
ctx.getComponentHolder().get(0);
// We must call flushed without synchronizing on opTracker to
avoid deadlocks
flushingComponent.flushed();
@@ -575,6 +575,7 @@ public class LSMHarness implements ILSMHarness {
// if the operation failed, we need to cleanup files
if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
operation.cleanup(lsmIndex.getBufferCache());
+ operation.getCallback().afterFailure(operation);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 3e17cb040d..88fa10546d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -53,7 +53,7 @@ public class LSMIndexDiskComponentBulkLoader implements
IIndexBulkLoader {
try {
componentBulkLoader.add(tuple);
} catch (Throwable th) {
- opCtx.getIoOperation().setFailure(th);
+ fail(th);
throw th;
}
}
@@ -63,7 +63,7 @@ public class LSMIndexDiskComponentBulkLoader implements
IIndexBulkLoader {
try {
componentBulkLoader.delete(tuple);
} catch (Throwable th) {
- opCtx.getIoOperation().setFailure(th);
+ fail(th);
throw th;
}
}
@@ -83,6 +83,7 @@ public class LSMIndexDiskComponentBulkLoader implements
IIndexBulkLoader {
@Override
public void abort() throws HyracksDataException {
opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
+ fail(null);
try {
try {
componentBulkLoader.abort();
@@ -115,6 +116,7 @@ public class LSMIndexDiskComponentBulkLoader implements
IIndexBulkLoader {
componentBulkLoader.end();
} catch (Throwable th) { // NOSONAR Must not call afterFinalize
without setting failure
fail(th);
+ componentBulkLoader.abort();
throw th;
} finally {
lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java
new file mode 100644
index 0000000000..7d10c13b20
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMInvertedComponentFileReferences.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class LSMInvertedComponentFileReferences extends
LSMComponentFileReferences {
+
+ private final FileReference invListsFileReference;
+
+ public LSMInvertedComponentFileReferences(FileReference
insertIndexFileReference,
+ FileReference deleteIndexFileReference, FileReference
bloomFilterFileReference,
+ FileReference invListsFileReference) {
+ super(insertIndexFileReference, deleteIndexFileReference,
bloomFilterFileReference);
+ this.invListsFileReference = invListsFileReference;
+ }
+
+ public FileReference getInvListsFileReference() {
+ return invListsFileReference;
+ }
+
+ @Override
+ public FileReference[] getFileReferences() {
+ return new FileReference[] { getInsertIndexFileReference(),
getDeleteIndexFileReference(),
+ getBloomFilterFileReference(), invListsFileReference };
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index c768768c0a..52bd07ed8b 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -41,6 +41,8 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
public class LSMTreeIndexAccessor implements ILSMIndexAccessor {
@FunctionalInterface
@@ -122,6 +124,18 @@ public class LSMTreeIndexAccessor implements
ILSMIndexAccessor {
@Override
public void flush(ILSMIOOperation operation) throws HyracksDataException {
lsmHarness.flush(operation);
+ if (operation.getStatus() ==
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+ IRetryPolicy policy = new ExponentialRetryPolicy();
+ while (operation.getStatus() ==
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+ if (policy.retry(operation.getFailure())) {
+ operation.setFailure(null);
+
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+ lsmHarness.flush(operation);
+ } else {
+ break;
+ }
+ }
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 842ec61527..a8cbfdd400 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -169,6 +169,22 @@ public class ComponentUtils {
}
}
+ public static void returnPages(ITreeIndex treeIndex) {
+ treeIndex.getPageManager().returnAllPages();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ // We need to return all pages to the buffer cache in case of a failure
+ try {
+ bufferCache.getCompressedPageWriter(treeIndex.getFileId()).abort();
+ } catch (IllegalStateException | NullPointerException ignored) {
+ // Since we call this method in multiple places, it is possible
that the writer
+ // is not in the State.WRITABLE, which would throw an
IllegalStateException.
+ // This means the writer has already written all the pages.
+ //
+ // We also catch NullPointerException in case the writer is not
initialized.
+ // Or if the compressed page writer is not applicable to this case
+ }
+ }
+
public static void markAsValid(IBufferCache bufferCache, BloomFilter
filter, boolean forceToDisk)
throws HyracksDataException {
if (forceToDisk) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index a828d229e1..f98a05629e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -18,6 +18,9 @@
*/
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+import static
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER;
+import static
org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager.INVLISTS_SUFFIX;
+
import java.util.ArrayList;
import java.util.List;
@@ -286,46 +289,56 @@ public class LSMInvertedIndex extends AbstractLSMIndex
implements IInvertedIndex
// Create a scan cursor on the deleted keys BTree underlying the
in-memory inverted index.
IIndexCursor deletedKeysScanCursor =
deletedKeysBTreeAccessor.createSearchCursor(false);
try {
- deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
try {
- while (deletedKeysScanCursor.hasNext()) {
- deletedKeysScanCursor.next();
-
componentBulkLoader.delete(deletedKeysScanCursor.getTuple());
+ deletedKeysBTreeAccessor.search(deletedKeysScanCursor,
nullPred);
+ try {
+ while (deletedKeysScanCursor.hasNext()) {
+ deletedKeysScanCursor.next();
+
componentBulkLoader.delete(deletedKeysScanCursor.getTuple());
+ }
+ } finally {
+ deletedKeysScanCursor.close();
}
} finally {
- deletedKeysScanCursor.close();
+ deletedKeysScanCursor.destroy();
}
- } finally {
- deletedKeysScanCursor.destroy();
- }
- // Scan the in-memory inverted index
- InMemoryInvertedIndexAccessor memInvIndexAccessor =
-
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- BTreeAccessor memBTreeAccessor =
memInvIndexAccessor.getBTreeAccessor();
- IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
- try {
- memBTreeAccessor.search(scanCursor, nullPred);
- // Bulk load the disk inverted index from the in-memory inverted
index.
+ // Scan the in-memory inverted index
+ InMemoryInvertedIndexAccessor memInvIndexAccessor =
+
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ BTreeAccessor memBTreeAccessor =
memInvIndexAccessor.getBTreeAccessor();
+ IIndexCursor scanCursor =
memBTreeAccessor.createSearchCursor(false);
try {
- while (scanCursor.hasNext()) {
- scanCursor.next();
- componentBulkLoader.add(scanCursor.getTuple());
+ memBTreeAccessor.search(scanCursor, nullPred);
+ // Bulk load the disk inverted index from the in-memory
inverted index.
+ try {
+ while (scanCursor.hasNext()) {
+ scanCursor.next();
+ componentBulkLoader.add(scanCursor.getTuple());
+ }
+ } finally {
+ scanCursor.close();
}
} finally {
- scanCursor.close();
+ scanCursor.destroy();
}
- } finally {
- scanCursor.destroy();
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- filterManager.updateFilter(component.getLSMComponentFilter(),
filterTuples, NoOpOperationCallback.INSTANCE);
- filterManager.writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+ filterManager.updateFilter(component.getLSMComponentFilter(),
filterTuples,
+ NoOpOperationCallback.INSTANCE);
+ filterManager.writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
+ }
+ flushingComponent.getMetadata().copy(component.getMetadata());
+ componentBulkLoader.end();
+ } catch (Throwable e) {
+ try {
+ componentBulkLoader.abort();
+ } catch (Throwable th) {
+ e.addSuppressed(th);
+ }
+ throw e;
}
- flushingComponent.getMetadata().copy(component.getMetadata());
- componentBulkLoader.end();
return component;
}
@@ -339,60 +352,71 @@ public class LSMInvertedIndex extends AbstractLSMIndex
implements IInvertedIndex
// Create an inverted index instance.
ILSMDiskComponent component = createDiskComponent(componentFactory,
mergeOp.getTarget(),
mergeOp.getDeletedKeysBTreeTarget(),
mergeOp.getBloomFilterTarget(), true);
- ILSMDiskComponentBulkLoader componentBulkLoader;
+ ILSMDiskComponentBulkLoader componentBulkLoader = null;
// In case we must keep the deleted-keys BTrees, then they must be
merged *before* merging the inverted
// indexes so that lsmHarness.endSearch() is called once when the
inverted indexes have been merged.
- if
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1)
!= diskComponents
- .get(diskComponents.size() - 1)) {
- // Keep the deleted tuples since the oldest disk component is not
included in the merge operation
- LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor =
- new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx,
mergeOp.getCursorStats());
- try {
- long numElements = 0L;
- for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
- numElements += ((LSMInvertedIndexDiskComponent)
mergeOp.getMergingComponents().get(i))
- .getBloomFilter().getNumElements();
- }
- componentBulkLoader = component.createBulkLoader(operation,
1.0f, false, numElements, false, false,
- false,
pageWriteCallbackFactory.createPageWriteCallback());
- loadDeleteTuples(opCtx, btreeCursor, mergePred,
componentBulkLoader);
- } finally {
- btreeCursor.destroy();
- }
- } else {
- componentBulkLoader = component.createBulkLoader(operation, 1.0f,
false, 0L, false, false, false,
- pageWriteCallbackFactory.createPageWriteCallback());
- }
- search(opCtx, cursor, mergePred);
try {
- while (cursor.hasNext()) {
- cursor.next();
- componentBulkLoader.add(cursor.getTuple());
+ if
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1)
!= diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component is
not included in the merge operation
+ LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor =
+ new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx,
mergeOp.getCursorStats());
+ try {
+ long numElements = 0L;
+ for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
+ numElements += ((LSMInvertedIndexDiskComponent)
mergeOp.getMergingComponents().get(i))
+ .getBloomFilter().getNumElements();
+ }
+ componentBulkLoader =
component.createBulkLoader(operation, 1.0f, false, numElements, false, false,
+ false,
pageWriteCallbackFactory.createPageWriteCallback());
+ loadDeleteTuples(opCtx, btreeCursor, mergePred,
componentBulkLoader);
+ } finally {
+ btreeCursor.destroy();
+ }
+ } else {
+ componentBulkLoader = component.createBulkLoader(operation,
1.0f, false, 0L, false, false, false,
+ pageWriteCallbackFactory.createPageWriteCallback());
}
- } finally {
+ search(opCtx, cursor, mergePred);
try {
- cursor.close();
+ while (cursor.hasNext()) {
+ cursor.next();
+ componentBulkLoader.add(cursor.getTuple());
+ }
} finally {
- cursor.destroy();
+ try {
+ cursor.close();
+ } finally {
+ cursor.destroy();
+ }
}
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
- for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
- ITupleReference min =
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple();
- ITupleReference max =
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple();
- if (min != null) {
- filterTuples.add(min);
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+ for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
+ ITupleReference min =
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple();
+ ITupleReference max =
mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple();
+ if (min != null) {
+ filterTuples.add(min);
+ }
+ if (max != null) {
+ filterTuples.add(max);
+ }
}
- if (max != null) {
- filterTuples.add(max);
+
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
+
getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
+ }
+ componentBulkLoader.end();
+ } catch (Throwable e) {
+ try {
+ if (componentBulkLoader != null) {
+ componentBulkLoader.abort();
}
+ } catch (Throwable th) {
+ e.addSuppressed(th);
}
- getFilterManager().updateFilter(component.getLSMComponentFilter(),
filterTuples,
- NoOpOperationCallback.INSTANCE);
- getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
+ throw e;
}
- componentBulkLoader.end();
return component;
}
@@ -487,7 +511,10 @@ public class LSMInvertedIndex extends AbstractLSMIndex
implements IInvertedIndex
throws HyracksDataException {
return new LSMInvertedIndexFlushOperation(new
LSMInvertedIndexAccessor(getHarness(), opCtx),
componentFileRefs.getInsertIndexFileReference(),
componentFileRefs.getDeleteIndexFileReference(),
- componentFileRefs.getBloomFilterFileReference(), callback,
getIndexIdentifier());
+ componentFileRefs.getBloomFilterFileReference(),
+ ioManager.resolve(
+
getInvListsFilePath(componentFileRefs.getInsertIndexFileReference().getAbsolutePath())),
+ callback, getIndexIdentifier());
}
@Override
@@ -497,7 +524,14 @@ public class LSMInvertedIndex extends AbstractLSMIndex
implements IInvertedIndex
IIndexCursorStats stats = new IndexCursorStats();
IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx, stats);
return new LSMInvertedIndexMergeOperation(accessor, cursor, stats,
mergeFileRefs.getInsertIndexFileReference(),
- mergeFileRefs.getDeleteIndexFileReference(),
mergeFileRefs.getBloomFilterFileReference(), callback,
- getIndexIdentifier());
+ mergeFileRefs.getDeleteIndexFileReference(),
mergeFileRefs.getBloomFilterFileReference(),
+
ioManager.resolve(getInvListsFilePath(mergeFileRefs.getInsertIndexFileReference().getAbsolutePath())),
+ callback, getIndexIdentifier());
+ }
+
+ public String getInvListsFilePath(String dictBTreeFilePath) {
+ int index = dictBTreeFilePath.lastIndexOf(DELIMITER);
+ String file = dictBTreeFilePath.substring(0, index);
+ return file + DELIMITER + INVLISTS_SUFFIX;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 67312ceb23..d8900ad2af 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -37,6 +37,8 @@ import
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccesso
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
public class LSMInvertedIndexAccessor implements ILSMIndexAccessor,
IInvertedIndexAccessor {
@@ -93,6 +95,18 @@ public class LSMInvertedIndexAccessor implements
ILSMIndexAccessor, IInvertedInd
@Override
public void flush(ILSMIOOperation operation) throws HyracksDataException {
lsmHarness.flush(operation);
+ if (operation.getStatus() ==
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+ IRetryPolicy policy = new ExponentialRetryPolicy();
+ while (operation.getStatus() ==
ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
+ if (policy.retry(operation.getFailure())) {
+ operation.setFailure(null);
+
operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+ lsmHarness.flush(operation);
+ } else {
+ break;
+ }
+ }
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 6de6ab5b64..3edb5feead 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -105,17 +105,22 @@ public class LSMInvertedIndexDiskComponent extends
AbstractLSMWithBuddyDiskCompo
@Override
public void markAsValid(boolean persist, IPageWriteFailureCallback
callback) throws HyracksDataException {
- ComponentUtils.markAsValid(getBloomFilterBufferCache(),
getBloomFilter(), persist);
-
- // Flush inverted index second.
- invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true);
- ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
- if (!callback.hasFailed()) {
- // Flush deleted keys BTree.
- ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
- }
- if (callback.hasFailed()) {
- throw HyracksDataException.create(callback.getFailure());
+ try {
+ ComponentUtils.markAsValid(getBloomFilterBufferCache(),
getBloomFilter(), persist);
+
+ // Flush inverted index second.
+ invIndex.getBufferCache().force((invIndex).getInvListsFileId(),
true);
+ ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
+ if (!callback.hasFailed()) {
+ // Flush deleted keys BTree.
+ ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+ }
+ if (callback.hasFailed()) {
+ throw HyracksDataException.create(callback.getFailure());
+ }
+ } catch (HyracksDataException ex) {
+ returnPages();
+ throw ex;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 3b8f82a0a0..442df0b39a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -34,6 +34,7 @@ import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage
import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory;
import
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import
org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences;
import
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
// TODO: Refactor for better code sharing with other file managers.
@@ -59,17 +60,19 @@ public class LSMInvertedIndexFileManager extends
AbstractLSMIndexFileManager imp
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws
HyracksDataException {
String baseName = getNextComponentSequence(deletedKeysBTreeFilter);
- return new LSMComponentFileReferences(baseDir.getChild(baseName +
DELIMITER + DICT_BTREE_SUFFIX),
+ return new
LSMInvertedComponentFileReferences(baseDir.getChild(baseName + DELIMITER +
DICT_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER +
DELETED_KEYS_BTREE_SUFFIX),
- baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
+ baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX),
+ baseDir.getChild(baseName + DELIMITER + INVLISTS_SUFFIX));
}
@Override
public LSMComponentFileReferences getRelMergeFileReference(String
firstFileName, String lastFileName) {
final String baseName =
IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
- return new LSMComponentFileReferences(baseDir.getChild(baseName +
DELIMITER + DICT_BTREE_SUFFIX),
+ return new
LSMInvertedComponentFileReferences(baseDir.getChild(baseName + DELIMITER +
DICT_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER +
DELETED_KEYS_BTREE_SUFFIX),
- baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
+ baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX),
+ baseDir.getChild(baseName + DELIMITER + INVLISTS_SUFFIX));
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 5b272eeea4..efd5060f91 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -24,17 +24,20 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import
org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences;
public class LSMInvertedIndexFlushOperation extends FlushOperation {
private final FileReference deletedKeysBTreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
+ private final FileReference invListsFlushTarget;
public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor,
FileReference flushTarget,
FileReference deletedKeysBTreeFlushTarget, FileReference
bloomFilterFlushTarget,
- ILSMIOOperationCallback callback, String indexIdentifier) {
+ FileReference invListsFlushTarget, ILSMIOOperationCallback
callback, String indexIdentifier) {
super(accessor, flushTarget, callback, indexIdentifier);
this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+ this.invListsFlushTarget = invListsFlushTarget;
}
public FileReference getDeletedKeysBTreeTarget() {
@@ -47,6 +50,7 @@ public class LSMInvertedIndexFlushOperation extends
FlushOperation {
@Override
public LSMComponentFileReferences getComponentFiles() {
- return new LSMComponentFileReferences(target,
deletedKeysBTreeFlushTarget, bloomFilterFlushTarget);
+ return new LSMInvertedComponentFileReferences(target,
deletedKeysBTreeFlushTarget, bloomFilterFlushTarget,
+ invListsFlushTarget);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 31c79f0a62..4ebb9d3cc1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import
org.apache.hyracks.storage.am.lsm.common.impls.LSMInvertedComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.IIndexCursorStats;
@@ -30,13 +31,15 @@ import org.apache.hyracks.storage.common.IIndexCursorStats;
public class LSMInvertedIndexMergeOperation extends MergeOperation {
private final FileReference deletedKeysBTreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
+ private final FileReference invListsMergeTarget;
public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor,
IIndexCursor cursor, IIndexCursorStats stats,
FileReference target, FileReference deletedKeysBTreeMergeTarget,
FileReference bloomFilterMergeTarget,
- ILSMIOOperationCallback callback, String indexIdentifier) {
+ FileReference invListsMergeTarget, ILSMIOOperationCallback
callback, String indexIdentifier) {
super(accessor, target, callback, indexIdentifier, cursor, stats);
this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
+ this.invListsMergeTarget = invListsMergeTarget;
}
public FileReference getDeletedKeysBTreeTarget() {
@@ -49,7 +52,8 @@ public class LSMInvertedIndexMergeOperation extends
MergeOperation {
@Override
public LSMComponentFileReferences getComponentFiles() {
- return new LSMComponentFileReferences(target,
deletedKeysBTreeMergeTarget, bloomFilterMergeTarget);
+ return new LSMInvertedComponentFileReferences(target,
deletedKeysBTreeMergeTarget, bloomFilterMergeTarget,
+ invListsMergeTarget);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 2583d0ab69..47068a30fe 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -397,6 +397,9 @@ public class OnDiskInvertedIndex implements
IInPlaceInvertedIndex {
if (btreeBulkloader != null) {
btreeBulkloader.abort();
}
+ if (currentPage != null) {
+ bufferCache.returnPage(currentPage, false);
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a8a190988e..8b88e7ee51 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -110,7 +110,6 @@ public class LSMRTree extends AbstractLSMRTree {
RangePredicate btreeNullPredicate = new RangePredicate(null, null,
true, true, null, null);
BTreeAccessor memBTreeAccessor =
flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- boolean abort = true;
try {
try {
rTreeTupleSorter = getRTreeTupleSorter(flushingComponent,
memBTreeAccessor, btreeNullPredicate,
@@ -142,12 +141,16 @@ public class LSMRTree extends AbstractLSMRTree {
}
// Note. If we change the filter to write to metadata object, we
don't need the if block above
flushingComponent.getMetadata().copy(component.getMetadata());
- abort = false;
componentBulkLoader.end();
- } finally {
- if (abort && componentBulkLoader != null) {
- componentBulkLoader.abort();
+ } catch (Throwable e) {
+ try {
+ if (componentBulkLoader != null) {
+ componentBulkLoader.abort();
+ }
+ } catch (Throwable th) {
+ e.addSuppressed(th);
}
+ throw e;
}
return component;
}
@@ -264,32 +267,35 @@ public class LSMRTree extends AbstractLSMRTree {
IIndexCursor cursor = mergeOp.getCursor();
ILSMDiskComponentBulkLoader componentBulkLoader = null;
ILSMDiskComponent mergedComponent = null;
- boolean abort = true;
try {
- mergedComponent = createDiskComponent(componentFactory,
mergeOp.getTarget(), mergeOp.getBTreeTarget(),
- mergeOp.getBloomFilterTarget(), true);
- componentBulkLoader = loadMergeBulkLoader(mergeOp, cursor,
mergedComponent);
- if (mergedComponent.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
- for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
-
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
-
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+ try {
+ mergedComponent = createDiskComponent(componentFactory,
mergeOp.getTarget(), mergeOp.getBTreeTarget(),
+ mergeOp.getBloomFilterTarget(), true);
+ componentBulkLoader = loadMergeBulkLoader(mergeOp, cursor,
mergedComponent);
+ if (mergedComponent.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+ for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
+
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
+
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+ }
+
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(),
filterTuples,
+ NoOpOperationCallback.INSTANCE);
+
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
+ mergedComponent.getMetadataHolder());
}
-
getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(),
filterTuples,
- NoOpOperationCallback.INSTANCE);
-
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
- mergedComponent.getMetadataHolder());
+ componentBulkLoader.end();
+ } finally {
+ cursor.destroy();
}
- abort = false;
- componentBulkLoader.end();
- } finally {
+ } catch (Throwable e) {
try {
- cursor.destroy();
- } finally {
- if (abort && componentBulkLoader != null) {
+ if (componentBulkLoader != null) {
componentBulkLoader.abort();
}
+ } catch (Throwable th) {
+ e.addSuppressed(th);
}
+ throw e;
}
return mergedComponent;
}
@@ -297,46 +303,38 @@ public class LSMRTree extends AbstractLSMRTree {
private ILSMDiskComponentBulkLoader
loadMergeBulkLoader(LSMRTreeMergeOperation mergeOp, IIndexCursor cursor,
ILSMDiskComponent mergedComponent) throws HyracksDataException {
ILSMDiskComponentBulkLoader componentBulkLoader = null;
- boolean abort = true;
ISearchPredicate rtreeSearchPred = new SearchPredicate(null, null);
ILSMIndexOperationContext opCtx = ((LSMRTreeSortedCursor)
cursor).getOpCtx();
search(opCtx, cursor, rtreeSearchPred);
try {
- try {
- // In case we must keep the deleted-keys BTrees, then they
must be merged
- // *before* merging the r-trees so that
- // lsmHarness.endSearch() is called once when the r-trees have
been merged.
- if
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1)
!= diskComponents
- .get(diskComponents.size() - 1)) {
- // Keep the deleted tuples since the oldest disk component
- // is not included in the merge operation
- long numElements = 0L;
- for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
- numElements += ((LSMRTreeDiskComponent)
mergeOp.getMergingComponents().get(i)).getBloomFilter()
- .getNumElements();
- }
- componentBulkLoader =
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false,
- false, false,
pageWriteCallbackFactory.createPageWriteCallback());
- mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred,
componentBulkLoader);
- } else {
- //no buddy-btree needed
- componentBulkLoader =
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false,
- false,
pageWriteCallbackFactory.createPageWriteCallback());
- }
- //search old rtree components
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- componentBulkLoader.add(frameTuple);
+ // In case we must keep the deleted-keys BTrees, then they must be
merged
+ // *before* merging the r-trees so that
+ // lsmHarness.endSearch() is called once when the r-trees have
been merged.
+ if
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1)
!= diskComponents
+ .get(diskComponents.size() - 1)) {
+ // Keep the deleted tuples since the oldest disk component
+ // is not included in the merge operation
+ long numElements = 0L;
+ for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
+ numElements += ((LSMRTreeDiskComponent)
mergeOp.getMergingComponents().get(i)).getBloomFilter()
+ .getNumElements();
}
- } finally {
- cursor.close();
+ componentBulkLoader =
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, numElements, false,
false,
+ false,
pageWriteCallbackFactory.createPageWriteCallback());
+ mergeLoadBTree(mergeOp, opCtx, rtreeSearchPred,
componentBulkLoader);
+ } else {
+ //no buddy-btree needed
+ componentBulkLoader =
mergedComponent.createBulkLoader(mergeOp, 1.0f, false, 0L, false, false, false,
+ pageWriteCallbackFactory.createPageWriteCallback());
}
- abort = false;
- } finally {
- if (abort && componentBulkLoader != null) {
- componentBulkLoader.abort();
+ //search old rtree components
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
}
+ } finally {
+ cursor.close();
}
return componentBulkLoader;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 729ca74363..6968b3c3c3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -103,107 +103,111 @@ public class LSMRTreeWithAntiMatterTuples extends
AbstractLSMRTree {
TreeTupleSorter rTreeTupleSorter = null;
TreeTupleSorter bTreeTupleSorter = null;
boolean isEmpty = true;
- boolean abort = true;
try {
- RTreeAccessor memRTreeAccessor =
-
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
try {
- RTreeSearchCursor rtreeScanCursor =
memRTreeAccessor.createSearchCursor(false);
+ RTreeAccessor memRTreeAccessor =
+
flushingComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
try {
- memRTreeAccessor.search(rtreeScanCursor,
rtreeNullPredicate);
- component = createDiskComponent(componentFactory,
flushOp.getTarget(), null, null, true);
- componentBulkLoader =
component.createBulkLoader(operation, 1.0f, false, 0L, false, false, false,
-
pageWriteCallbackFactory.createPageWriteCallback());
- // Since the LSM-RTree is used as a secondary assumption,
the
- // primary key will be the last comparator in the BTree
comparators
- rTreeTupleSorter = new
TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray,
- rtreeLeafFrameFactory.createFrame(),
rtreeLeafFrameFactory.createFrame(),
- flushingComponent.getIndex().getBufferCache(),
comparatorFields);
+ RTreeSearchCursor rtreeScanCursor =
memRTreeAccessor.createSearchCursor(false);
try {
- isEmpty = scanAndSort(rtreeScanCursor,
rTreeTupleSorter);
+ memRTreeAccessor.search(rtreeScanCursor,
rtreeNullPredicate);
+ component = createDiskComponent(componentFactory,
flushOp.getTarget(), null, null, true);
+ componentBulkLoader =
component.createBulkLoader(operation, 1.0f, false, 0L, false, false,
+ false,
pageWriteCallbackFactory.createPageWriteCallback());
+ // Since the LSM-RTree is used as a secondary
assumption, the
+ // primary key will be the last comparator in the
BTree comparators
+ rTreeTupleSorter =
+ new
TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray,
+ rtreeLeafFrameFactory.createFrame(),
rtreeLeafFrameFactory.createFrame(),
+
flushingComponent.getIndex().getBufferCache(), comparatorFields);
+ try {
+ isEmpty = scanAndSort(rtreeScanCursor,
rTreeTupleSorter);
+ } finally {
+ rtreeScanCursor.close();
+ }
} finally {
- rtreeScanCursor.close();
+ rtreeScanCursor.destroy();
}
} finally {
- rtreeScanCursor.destroy();
+ memRTreeAccessor.destroy();
}
- } finally {
- memRTreeAccessor.destroy();
- }
- if (!isEmpty) {
- rTreeTupleSorter.sort();
- }
- // scan the memory BTree
- RangePredicate btreeNullPredicate = new RangePredicate(null, null,
true, true, null, null);
- BTreeAccessor memBTreeAccessor =
-
flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- try {
- bTreeTupleSorter = new
TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(), linearizerArray,
- btreeLeafFrameFactory.createFrame(),
btreeLeafFrameFactory.createFrame(),
- flushingComponent.getBuddyIndex().getBufferCache(),
comparatorFields);
- BTreeRangeSearchCursor btreeScanCursor =
- (BTreeRangeSearchCursor)
memBTreeAccessor.createSearchCursor(false);
+ if (!isEmpty) {
+ rTreeTupleSorter.sort();
+ }
+ // scan the memory BTree
+ RangePredicate btreeNullPredicate = new RangePredicate(null,
null, true, true, null, null);
+ BTreeAccessor memBTreeAccessor =
+
flushingComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- isEmpty = true;
- memBTreeAccessor.search(btreeScanCursor,
btreeNullPredicate);
+ bTreeTupleSorter = new
TreeTupleSorter(flushingComponent.getBuddyIndex().getFileId(),
+ linearizerArray,
btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(),
+
flushingComponent.getBuddyIndex().getBufferCache(), comparatorFields);
+ BTreeRangeSearchCursor btreeScanCursor =
+ (BTreeRangeSearchCursor)
memBTreeAccessor.createSearchCursor(false);
try {
- isEmpty = scanAndSort(btreeScanCursor,
bTreeTupleSorter);
+ isEmpty = true;
+ memBTreeAccessor.search(btreeScanCursor,
btreeNullPredicate);
+ try {
+ isEmpty = scanAndSort(btreeScanCursor,
bTreeTupleSorter);
+ } finally {
+ btreeScanCursor.close();
+ }
} finally {
- btreeScanCursor.close();
+ btreeScanCursor.destroy();
}
} finally {
- btreeScanCursor.destroy();
+ memBTreeAccessor.destroy();
}
- } finally {
- memBTreeAccessor.destroy();
- }
- if (!isEmpty) {
- bTreeTupleSorter.sort();
- }
- LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new
LSMRTreeWithAntiMatterTuplesFlushCursor(
- rTreeTupleSorter, bTreeTupleSorter, comparatorFields,
linearizerArray);
- try {
- cursor.open(null, null);
+ if (!isEmpty) {
+ bTreeTupleSorter.sort();
+ }
+ LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new
LSMRTreeWithAntiMatterTuplesFlushCursor(
+ rTreeTupleSorter, bTreeTupleSorter, comparatorFields,
linearizerArray);
try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- componentBulkLoader.add(frameTuple);
+ cursor.open(null, null);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
+ }
+ } finally {
+ cursor.close();
}
} finally {
- cursor.close();
+ cursor.destroy();
}
- } finally {
- cursor.destroy();
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
-
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
- NoOpOperationCallback.INSTANCE);
-
getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
- }
- flushingComponent.getMetadata().copy(component.getMetadata());
- abort = false;
- componentBulkLoader.end();
- } finally {
- try {
- if (rTreeTupleSorter != null) {
- rTreeTupleSorter.destroy();
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
+
getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
}
+ flushingComponent.getMetadata().copy(component.getMetadata());
+ componentBulkLoader.end();
} finally {
try {
- if (bTreeTupleSorter != null) {
- bTreeTupleSorter.destroy();
+ if (rTreeTupleSorter != null) {
+ rTreeTupleSorter.destroy();
}
} finally {
- if (abort && componentBulkLoader != null) {
- componentBulkLoader.abort();
+ if (bTreeTupleSorter != null) {
+ bTreeTupleSorter.destroy();
}
}
}
+ } catch (Throwable e) {
+ try {
+ if (componentBulkLoader != null) {
+ componentBulkLoader.abort();
+ }
+ } catch (Throwable th) {
+ e.addSuppressed(th);
+ }
+ throw e;
}
return component;
}
@@ -243,26 +247,35 @@ public class LSMRTreeWithAntiMatterTuples extends
AbstractLSMRTree {
ILSMDiskComponentBulkLoader componentBulkLoader =
component.createBulkLoader(operation, 1.0f, false, 0L, false,
false, false,
pageWriteCallbackFactory.createPageWriteCallback());
try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- componentBulkLoader.add(frameTuple);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
+ }
+ } finally {
+ cursor.close();
}
- } finally {
- cursor.close();
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
- for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
-
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
-
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+ for (int i = 0; i < mergeOp.getMergingComponents().size();
++i) {
+
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
+
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
+ }
+
getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
+
getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
}
- getFilterManager().updateFilter(component.getLSMComponentFilter(),
filterTuples,
- NoOpOperationCallback.INSTANCE);
- getFilterManager().writeFilter(component.getLSMComponentFilter(),
component.getMetadataHolder());
- }
- componentBulkLoader.end();
+ componentBulkLoader.end();
+ } catch (Throwable e) {
+ try {
+ componentBulkLoader.abort();
+ } catch (Throwable th) {
+ e.addSuppressed(th);
+ }
+ throw e;
+ }
return component;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 35785d3e48..33ba01ecdc 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -976,17 +976,22 @@ public class RTree extends AbstractTreeIndex {
@Override
public void end() throws HyracksDataException {
- pagesToWrite.clear();
- //if writing a trivial 1-page tree, don't try and propagate up
- if (nodeFrontiers.size() > 1) {
- propagateBulk(1, true, pagesToWrite);
- }
+ try {
+ pagesToWrite.clear();
+ //if writing a trivial 1-page tree, don't try and propagate up
+ if (nodeFrontiers.size() > 1) {
+ propagateBulk(1, true, pagesToWrite);
+ }
- for (ICachedPage c : pagesToWrite) {
- write(c);
+ for (ICachedPage c : pagesToWrite) {
+ write(c);
+ }
+ finish();
+ super.end();
+ } catch (HyracksDataException e) {
+ handleException();
+ throw e;
}
- finish();
- super.end();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 4c2395fd12..a4ecb6f490 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -499,11 +499,13 @@ public class BufferCache implements IBufferCacheInternal,
ILifeCycleComponent, I
synchronized (cachedPages) {
for (ICachedPageInternal internalPage : cachedPages) {
CachedPage c = (CachedPage) internalPage;
- if (c.confiscated() || c.latch.getReadLockCount() != 0 ||
c.latch.getWriteHoldCount() != 0) {
- return false;
- }
- if (c.valid) {
- reachableDpids.add(c.dpid);
+ if (c != null) {
+ if (c.confiscated() || c.latch.getReadLockCount() != 0 ||
c.latch.getWriteHoldCount() != 0) {
+ return false;
+ }
+ if (c.valid) {
+ reachableDpids.add(c.dpid);
+ }
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 7d4e119bef..219275e4b9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -15,6 +15,8 @@
package org.apache.hyracks.storage.common.buffercache;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
@@ -38,18 +40,21 @@ public class FIFOLocalWriter implements IFIFOPageWriter {
this.context = context;
}
- @SuppressWarnings("squid:S1181") // System must halt on all IO errors
+ @SuppressWarnings("squid:S1181")
@Override
- public void write(ICachedPage page) {
+ public void write(ICachedPage page) throws HyracksDataException {
CachedPage cPage = (CachedPage) page;
try {
callback.beforeWrite(cPage);
bufferCache.write(cPage, context);
callback.afterWrite(cPage);
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Failed to write page {}", cPage, e);
+ throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION,
e);
} catch (Throwable th) {
// Halt
- LOGGER.error("Failed to write page {}", cPage, th);
- ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+ LOGGER.error("FIFOLocalWriter has encountered a fatal error", th);
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
} finally {
bufferCache.returnPage(cPage);
if (DEBUG) {
@@ -57,5 +62,4 @@ public class FIFOLocalWriter implements IFIFOPageWriter {
}
}
}
-
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
index 31e0e85b1f..2dc4a585f5 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
@@ -15,7 +15,9 @@
package org.apache.hyracks.storage.common.buffercache;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
@FunctionalInterface
public interface IFIFOPageWriter {
- void write(ICachedPage page);
+ void write(ICachedPage page) throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index b332a2de14..080b9ea16a 100644
---
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -21,22 +21,31 @@ package org.apache.hyracks.util;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
public class ExponentialRetryPolicy implements IRetryPolicy {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public static final String CLOUD_UNSTABLE_MODE = "cloud.unstable.mode";
private static final int DEFAULT_MAX_RETRIES = 10;
private static final long DEFAULT_INITIAL_DELAY_IN_MILLIS = 100;
private static final long DEFAULT_MAX_DELAY_IN_MILLIS = Long.MAX_VALUE - 1;
+ private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
private final int maxRetries;
private final long maxDelay;
private int attempt = 0;
private long delay;
+ private boolean printDebugLines = true;
/**
* Default constructor for ExponentialRetryPolicy.
* Initializes with default max retries, initial delay, and max delay.
*/
public ExponentialRetryPolicy() {
- this(DEFAULT_MAX_RETRIES, DEFAULT_INITIAL_DELAY_IN_MILLIS,
DEFAULT_MAX_DELAY_IN_MILLIS);
+ this(isUnstable() ? UNSTABLE_NUMBER_OF_RETRIES : DEFAULT_MAX_RETRIES,
DEFAULT_INITIAL_DELAY_IN_MILLIS,
+ isUnstable() ? 0 : DEFAULT_MAX_DELAY_IN_MILLIS);
}
/**
@@ -71,13 +80,18 @@ public class ExponentialRetryPolicy implements IRetryPolicy
{
*/
public ExponentialRetryPolicy(int maxRetries, long maxDelay) {
this(maxRetries, DEFAULT_INITIAL_DELAY_IN_MILLIS, maxDelay);
+ printDebugLines = false;
}
@Override
public boolean retry(Throwable failure) {
if (attempt < maxRetries) {
try {
-
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextLong(1 + delay));
+ long sleepTime = ThreadLocalRandom.current().nextLong(1 +
delay);
+ if (printDebugLines) {
+ LOGGER.info("Retrying after {}ms, attempt: {}/{}",
sleepTime, attempt + 1, maxRetries);
+ }
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -87,4 +101,8 @@ public class ExponentialRetryPolicy implements IRetryPolicy {
}
return false;
}
+
+ private static boolean isUnstable() {
+ return Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
+ }
}