This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 17670aab184fe12fa30dc79376e819e07fac43c4 Author: Murtadha Hubail <[email protected]> AuthorDate: Wed Mar 31 21:18:18 2021 +0300 [NO ISSUE][STO] Delete invalid indexes during cluster global recovery - user model changes: yes - storage format changes: no - interface changes: no Details: - Before starting cluster global recovery, send to all NCs valid dataset ids from the metadata node. - Delete any invalid indexes on NCs based on the metadata received from the CC. - Add storage options to enable/disable global storage recovery. This allows tests that create storage objects without using the metadata node to bypass global cleanup. - Add storage option to specify the timeout for nodes to perform global storage cleanup. - Add test case for global storage recovery. - Adapt existing test cases that require bypassing global cleanup. Change-Id: Idee73e57fa5879c3b9aab5f881bf848e225f874b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10784 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../app/message/StorageCleanupRequestMessage.java | 108 +++++++++++++++++++++ .../apache/asterix/app/message/VoidResponse.java | 63 ++++++++++++ .../hyracks/bootstrap/GlobalRecoveryManager.java | 30 ++++++ .../test/dataflow/GlobalStorageCleanupTest.java | 68 +++++++++++++ .../test/dataflow/LSMFlushRecoveryTest.java | 12 ++- .../api/cluster_state_1/cluster_state_1.1.regexadm | 1 + .../cluster_state_1_full.1.regexadm | 1 + .../cluster_state_1_less.1.regexadm | 1 + .../asterix/common/config/StorageProperties.java | 25 ++++- 9 files changed, 304 insertions(+), 5 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java new file mode 100644 index 0000000..85269a1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import static org.apache.hyracks.util.ExitUtil.EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED; + +import java.util.Map; +import java.util.Set; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.messaging.CcIdentifiedMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.LocalResource; +import org.apache.hyracks.util.ExitUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class StorageCleanupRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LogManager.getLogger(); + private final Set<Integer> validDatasetIds; + private final long reqId; + + public StorageCleanupRequestMessage(long reqId, Set<Integer> validDatasetIds) { + this.validDatasetIds = validDatasetIds; + this.reqId = reqId; + } + + @Override + public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException { + INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker(); + PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appContext.getLocalResourceRepository(); + Map<Long, LocalResource> localResources = localResourceRepository.loadAndGetAllResources(); + for (LocalResource resource : localResources.values()) { + DatasetLocalResource lr = (DatasetLocalResource) resource.getResource(); + if (MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())) { + // skip metadata indexes + continue; + } + if (!validDatasetIds.contains(lr.getDatasetId())) { + LOGGER.warn("found invalid index {} with dataset id {}", resource.getPath(), lr.getDatasetId()); + deleteInvalidIndex(appContext, localResourceRepository, resource); + } + } + try { + broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null)); + } catch (Exception e) { + LOGGER.error("failed to notify CC of storage clean up; halting...", e); + ExitUtil.halt(EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED); + } + } + + private void deleteInvalidIndex(INcApplicationContext appContext, + PersistentLocalResourceRepository localResourceRepository, LocalResource resource) + throws HyracksDataException { + IDatasetLifecycleManager lcManager = appContext.getDatasetLifecycleManager(); + String resourceRelPath = resource.getPath(); + synchronized (lcManager) { + IIndex index; + index = lcManager.get(resourceRelPath); + if (index != null) { + LOGGER.warn("unregistering invalid index {}", resourceRelPath); + lcManager.unregister(resourceRelPath); + } else { + LOGGER.warn("initializing unregistered invalid index {}", resourceRelPath); + try { + index = resource.getResource().createInstance(appContext.getServiceContext()); + } catch (Exception e) { + LOGGER.warn("failed to initialize invalid index {}", resourceRelPath, e); + } + } + localResourceRepository.delete(resourceRelPath); + if (index != null) { + index.destroy(); + } + } + } + + @Override + public String toString() { + return StorageCleanupRequestMessage.class.getSimpleName(); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/VoidResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/VoidResponse.java new file mode 100644 index 0000000..6a51c2d --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/VoidResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.common.messaging.api.INcResponse; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * A response to a request only indicating success or failure + */ +public class VoidResponse implements ICcAddressedMessage, INcResponse { + + private static final long serialVersionUID = 1L; + private final Long reqId; + private final Throwable failure; + + public VoidResponse(Long reqId, Throwable failure) { + this.reqId = reqId; + this.failure = failure; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + broker.respond(reqId, this); + } + + @Override + public void setResult(MutablePair<ResponseState, Object> result) { + if (failure != null) { + result.setLeft(ResponseState.FAILURE); + result.setRight(failure); + } else { + result.setLeft(ResponseState.SUCCESS); + } + } + + @Override + public String toString() { + return "{ \"response\" : \"" + (failure == null ? "success" : failure.getClass().getSimpleName()) + "\"}"; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index 8165316..cf2af95 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -18,11 +18,15 @@ */ package org.apache.asterix.hyracks.bootstrap; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.asterix.app.message.StorageCleanupRequestMessage; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; @@ -32,6 +36,7 @@ import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -111,6 +116,10 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { LOGGER.info("Starting Global Recovery"); MetadataManager.INSTANCE.init(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + if (appCtx.getStorageProperties().isStorageGlobalCleanup()) { + int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout(); + performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout); + } mdTxnCtx = doRecovery(appCtx, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); recoveryCompleted = true; @@ -122,6 +131,27 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { } } + protected void performGlobalStorageCleanup(MetadataTransactionContext mdTxnCtx, int storageGlobalCleanupTimeoutSecs) + throws Exception { + List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx); + Set<Integer> validDatasetIds = new HashSet<>(); + for (Dataverse dataverse : dataverses) { + List<Dataset> dataverseDatasets = + MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName()); + dataverseDatasets.stream().map(Dataset::getDatasetId).forEach(validDatasetIds::add); + } + ICcApplicationContext ccAppCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes()); + CCMessageBroker messageBroker = (CCMessageBroker) ccAppCtx.getServiceContext().getMessageBroker(); + long reqId = messageBroker.newRequestId(); + List<StorageCleanupRequestMessage> requests = new ArrayList<>(); + for (int i = 0; i < ncs.size(); i++) { + requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds)); + } + messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, + TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs)); + } + protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx) throws Exception { // Loop over datasets diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalStorageCleanupTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalStorageCleanupTest.java new file mode 100644 index 0000000..84107fb --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalStorageCleanupTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.io.File; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.test.common.TestHelper; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class GlobalStorageCleanupTest { + + public static final Logger LOGGER = LogManager.getLogger(); + private static TestNodeController nc; + + @BeforeClass + public static void setUp() throws Exception { + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + + File.separator + "resources" + File.separator + "cc.conf"; + nc = new TestNodeController(configPath, false); + } + + @Test + public void globalStorageCleanup() throws Exception { + nc.init(true); + LSMFlushRecoveryTest.nc = nc; + LSMFlushRecoveryTest lsmFlushRecoveryTest = new LSMFlushRecoveryTest(); + lsmFlushRecoveryTest.initializeTestCtx(); + lsmFlushRecoveryTest.createIndex(); + lsmFlushRecoveryTest.readIndex(); + nc.deInit(false); + nc.init(false); + // the index should deleted after the node initialization + lsmFlushRecoveryTest.initializeTestCtx(); + boolean failedToReadIndex = false; + try { + lsmFlushRecoveryTest.readIndex(); + } catch (Exception e) { + failedToReadIndex = true; + Assert.assertTrue(e.getMessage().contains(ErrorCode.INDEX_DOES_NOT_EXIST.errorCode())); + } + Assert.assertTrue(failedToReadIndex); + nc.deInit(false); + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java index c3a6839..9c6e95e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java @@ -77,7 +77,7 @@ import org.junit.Test; public class LSMFlushRecoveryTest { public static final Logger LOGGER = LogManager.getLogger(); - private static TestNodeController nc; + public static TestNodeController nc; private static Dataset dataset; private static PrimaryIndexInfo[] primaryIndexInfos; private static SecondaryIndexInfo[] secondaryIndexInfo; @@ -156,6 +156,10 @@ public class LSMFlushRecoveryTest { } private void initializeNc(boolean cleanUpOnStart) throws Exception { + // disable global clean up for this test to allow internal index creation + List<Pair<IOption, Object>> opts = new ArrayList<>(); + opts.add(Pair.of(Option.STORAGE_GLOBAL_CLEANUP, false)); + nc.setOpts(opts); nc.init(cleanUpOnStart); ncAppCtx = nc.getAppRuntimeContext(); // Override the LSMIOScheduler to avoid halting on failure and enable @@ -177,7 +181,7 @@ public class LSMFlushRecoveryTest { dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager(); } - private void createIndex() throws Exception { + public void createIndex() throws Exception { dataset = StorageTestUtils.DATASET; secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME, SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS, @@ -193,7 +197,7 @@ public class LSMFlushRecoveryTest { } - private void initializeTestCtx() throws Exception { + public void initializeTestCtx() throws Exception { JobId jobId = nc.newJobId(); testCtxs = new IHyracksTaskContext[NUM_PARTITIONS]; for (int i = 0; i < NUM_PARTITIONS; i++) { @@ -203,7 +207,7 @@ public class LSMFlushRecoveryTest { new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); } - private void readIndex() throws HyracksDataException { + public void readIndex() throws HyracksDataException { primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS]; primaryIndexes = new TestLsmBtree[NUM_PARTITIONS]; for (int i = 0; i < NUM_PARTITIONS; i++) { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index eba50dc..f4dfdd1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -44,6 +44,7 @@ "replication\.timeout" : 30, "ssl\.enabled" : false, "storage.compression.block" : "snappy", + "storage.global.cleanup.timeout" : 600, "storage.lsm.bloomfilter.falsepositiverate" : 0.01, "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 98faa65..8fc48f9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -44,6 +44,7 @@ "replication\.timeout" : 30, "ssl\.enabled" : false, "storage.compression.block" : "snappy", + "storage.global.cleanup.timeout" : 600, "storage.lsm.bloomfilter.falsepositiverate" : 0.01, "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index a92f7d1..ed265e5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -44,6 +44,7 @@ "replication\.timeout" : 30, "ssl\.enabled" : false, "storage.compression.block" : "snappy", + "storage.global.cleanup.timeout" : 600, "storage.lsm.bloomfilter.falsepositiverate" : 0.01, "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java index d9463bf..12c9c68 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.config; +import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT; @@ -27,6 +28,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; @@ -57,7 +59,9 @@ public class StorageProperties extends AbstractProperties { STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l), STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2), STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8), - STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2); + STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2), + STORAGE_GLOBAL_CLEANUP(BOOLEAN, true), + STORAGE_GLOBAL_CLEANUP_TIMEOUT(POSITIVE_INTEGER, (int) TimeUnit.MINUTES.toSeconds(10)); private final IOptionType interpreter; private final Object defaultValue; @@ -72,6 +76,8 @@ public class StorageProperties extends AbstractProperties { switch (this) { case STORAGE_COMPRESSION_BLOCK: case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE: + case STORAGE_GLOBAL_CLEANUP: + case STORAGE_GLOBAL_CLEANUP_TIMEOUT: return Section.COMMON; default: return Section.NC; @@ -119,6 +125,10 @@ public class StorageProperties extends AbstractProperties { return "The maximum number of scheduled merges per partition (0 means unlimited)"; case STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION: return "The maximum number of concurrently executed merges per partition (0 means unlimited)"; + case STORAGE_GLOBAL_CLEANUP: + return "Indicates whether or not global storage cleanup is performed"; + case STORAGE_GLOBAL_CLEANUP_TIMEOUT: + return "The maximum time to wait for nodes to respond to global storage cleanup requests"; default: throw new IllegalStateException("NYI: " + this); } @@ -138,6 +148,11 @@ public class StorageProperties extends AbstractProperties { public String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) { return null; } + + @Override + public boolean hidden() { + return this == STORAGE_GLOBAL_CLEANUP; + } } public static final long MAX_HEAP_BYTES = Runtime.getRuntime().maxMemory(); @@ -227,6 +242,14 @@ public class StorageProperties extends AbstractProperties { return value != 0 ? value * numPartitions : Integer.MAX_VALUE; } + public boolean isStorageGlobalCleanup() { + return accessor.getBoolean(Option.STORAGE_GLOBAL_CLEANUP); + } + + public int getStorageGlobalCleanupTimeout() { + return accessor.getInt(Option.STORAGE_GLOBAL_CLEANUP_TIMEOUT); + } + protected int getMetadataDatasets() { return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT; }
