This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 42c18e65fb [ASTERIXDB-3387][STO] Enable Selective caching policy
42c18e65fb is described below
commit 42c18e65fb44113f3bee8a3ce7ac38259967e4de
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed May 15 16:08:04 2024 -0700
[ASTERIXDB-3387][STO] Enable Selective caching policy
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
Allow user to enable/select Selective caching policy
Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../apache/asterix/app/nc/NCAppRuntimeContext.java | 47 ++++--
.../nc/task/CloudToLocalStorageCachingTask.java | 2 +
.../asterix/hyracks/bootstrap/CCApplication.java | 5 +-
.../api/cluster_state_1/cluster_state_1.1.regexadm | 6 +
.../cluster_state_1_full.1.regexadm | 6 +
.../cluster_state_1_less.1.regexadm | 6 +
.../apache/asterix/cloud/CloudConfigurator.java | 180 +++++++++++++++++++++
.../apache/asterix/cloud/EagerCloudIOManager.java | 2 +-
.../apache/asterix/cloud/LazyCloudIOManager.java | 8 +-
.../apache/asterix/cloud/lazy/ParallelCacher.java | 2 +-
.../lazy/accessor/SelectiveCloudAccessor.java | 19 ++-
.../cloud/lazy/filesystem/HolePuncherProvider.java | 28 +++-
.../asterix/common/api/INcApplicationContext.java | 6 +
.../asterix/common/config/CloudProperties.java | 75 ++++++++-
.../common/context/DatasetLifecycleManager.java | 13 +-
.../runtime/utils/RuntimeComponentsProvider.java | 23 ++-
...udDiskCacheMonitoringAndPrefetchingService.java | 7 +
.../CloudDiskResourceCacheLockNotifier.java | 148 +++++++++++++----
.../cache/service/DiskCacheSweeperThread.java | 80 +++++++--
.../cloud/cache/unit/AbstractIndexUnit.java | 59 +++++++
.../hyracks/cloud/cache/unit/DatasetUnit.java | 106 ------------
.../{IndexUnit.java => SweepableIndexUnit.java} | 38 ++---
.../unit/UnsweepableIndexUnit.java} | 22 ++-
.../apache/hyracks/cloud/io/ICloudIOManager.java | 7 +-
.../org/apache/hyracks/cloud/sweeper/ISweeper.java | 4 +-
.../apache/hyracks/cloud/sweeper/NoOpSweeper.java | 4 +-
.../apache/hyracks/cloud/sweeper/SweepContext.java | 8 +-
.../org/apache/hyracks/cloud/sweeper/Sweeper.java | 8 +-
.../apache/hyracks/control/nc/io/IOManager.java | 8 +
.../hyracks/control/nc/io/NoOpDiskSpaceMaker.java} | 17 +-
.../btree/helper/BTreeHelperStorageManager.java | 7 +
.../btree/column/cloud/sweep/ColumnSweeper.java | 4 +-
.../dataflow/LSMColumnBTreeLocalResource.java | 4 +-
.../lsm/btree/column/utils/LSMColumnBTreeUtil.java | 23 ++-
.../am/lsm/btree/impls/LSMBTreeFileManager.java | 19 ++-
.../storage/am/lsm/btree/utils/LSMBTreeUtil.java | 2 +-
.../hyracks/storage/common/IStorageManager.java | 7 +
.../common/disk/IDiskCacheMonitoringService.java | 11 ++
.../disk/IDiskResourceCacheLockNotifier.java | 13 +-
.../disk/NoOpDiskCacheMonitoringService.java | 9 ++
.../disk/NoOpDiskResourceCacheLockNotifier.java | 8 +-
.../hyracks/test/support/TestStorageManager.java | 7 +
42 files changed, 793 insertions(+), 265 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index cd52b9dd90..1d6a0d8978 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.cloud.LocalPartitionBootstrapper;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -122,6 +122,10 @@ import
org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.FileMapManager;
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -180,6 +184,7 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
private IPartitionBootstrapper partitionBootstrapper;
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
+ private IDiskCacheMonitoringService diskCacheService;
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory, INamespaceResolver
namespaceResolver,
@@ -211,16 +216,26 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
IConfigValidatorFactory configValidatorFactory,
IReplicationStrategyFactory replicationStrategyFactory,
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
- IDiskCachedPageAllocator pageAllocator =
DefaultDiskCachedPageAllocator.INSTANCE;
- IBufferCacheReadContext defaultContext =
DefaultBufferCacheReadContextProvider.DEFAULT;
+ CloudConfigurator cloudConfigurator;
+ IDiskResourceCacheLockNotifier lockNotifier;
+ IDiskCachedPageAllocator pageAllocator;
+ IBufferCacheReadContext defaultContext;
if (isCloudDeployment()) {
- persistenceIOManager =
- CloudManagerProvider.createIOManager(cloudProperties,
ioManager, namespacePathResolver);
- partitionBootstrapper =
CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
+ cloudConfigurator = CloudConfigurator.of(cloudProperties,
ioManager, namespacePathResolver);
+ persistenceIOManager = cloudConfigurator.getCloudIoManager();
+ partitionBootstrapper =
cloudConfigurator.getPartitionBootstrapper();
+ lockNotifier = cloudConfigurator.getLockNotifier();
+ pageAllocator = cloudConfigurator.getPageAllocator();
+ defaultContext = cloudConfigurator.getDefaultContext();
} else {
+ cloudConfigurator = null;
persistenceIOManager = ioManager;
partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
+ lockNotifier = NoOpDiskResourceCacheLockNotifier.INSTANCE;
+ pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
+ defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
}
+
int ioQueueLen =
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -261,9 +276,8 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
// Must start vbc now instead of by life cycle component manager
(lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
- datasetLifecycleManager =
- new DatasetLifecycleManager(storageProperties,
localResourceRepository, txnSubsystem.getLogManager(),
- virtualBufferCache, indexCheckpointManagerProvider,
ioManager.getIODevices().size());
+ datasetLifecycleManager = new
DatasetLifecycleManager(storageProperties, localResourceRepository,
+ txnSubsystem.getLogManager(), virtualBufferCache,
indexCheckpointManagerProvider, lockNotifier);
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions =
metadataProperties.getNodePartitions(nodeId);
@@ -300,6 +314,15 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
fileInfoMap, defaultContext);
}
+ if (cloudConfigurator != null) {
+ diskCacheService =
+
cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(),
bufferCache, fileInfoMap);
+ } else {
+ diskCacheService = NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
+ diskCacheService.start();
+
NodeControllerService ncs = (NodeControllerService)
getServiceContext().getControllerService();
FileReference appDir =
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
@@ -351,6 +374,7 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
@Override
public synchronized void preStop() throws Exception {
activeManager.shutdown();
+ diskCacheService.stop();
if (metadataNodeStub != null) {
unexportMetadataNodeStub();
}
@@ -694,6 +718,11 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
return diskWriteRateLimiterProvider;
}
+ @Override
+ public IDiskCacheMonitoringService getDiskCacheService() {
+ return diskCacheService;
+ }
+
@Override
public boolean isCloudDeployment() {
return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index 6b3257d870..547bc8bea3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -63,6 +63,8 @@ public class CloudToLocalStorageCachingTask implements
INCLifecycleTask {
IPartitionBootstrapper bootstrapper =
applicationContext.getPartitionBootstrapper();
bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(),
metadataNode, metadataPartitionId, cleanup,
latestCheckpoint == null);
+ // Report all local resources
+
applicationContext.getDiskCacheService().reportLocalResources(lrs.loadAndGetAllResources());
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index ba3ffcd5de..cb8b8e5bb7 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -61,7 +61,7 @@ import org.apache.asterix.app.config.ConfigValidator;
import org.apache.asterix.app.io.PersistedResourceRegistry;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.app.result.JobResultCallback;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.INamespacePathResolver;
@@ -186,8 +186,7 @@ public class CCApplication extends BaseCCApplication {
CloudProperties cloudProperties = null;
if (cloudDeployment) {
cloudProperties = new
CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
- ioManager =
- (IOManager)
CloudManagerProvider.createIOManager(cloudProperties, ioManager,
namespacePathResolver);
+ ioManager = CloudConfigurator.createIOManager(ioManager,
cloudProperties, namespacePathResolver);
}
IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
appCtx = createApplicationContext(null, globalRecoveryManager,
lifecycleCoordinator, Receptionist::new,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 6888accc37..4c06994354 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -11,13 +11,19 @@
"azure.request.timeout" : 120,
"cloud.deployment" : false,
"cloud.profiler.log.interval" : 0,
+ "cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
"cloud.storage.cache.policy" : "lazy",
+ "cloud.storage.debug.mode.enabled" : false,
+ "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+ "cloud.storage.disk.monitor.interval" : 60,
"cloud.storage.endpoint" : "",
+ "cloud.storage.index.inactive.duration.threshold" : 6,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
+ "cloud.storage.sweep.threshold.percentage" : 0.9,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 31daf991a3..1d64adeb01 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -11,13 +11,19 @@
"azure.request.timeout" : 120,
"cloud.deployment" : false,
"cloud.profiler.log.interval" : 0,
+ "cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
"cloud.storage.cache.policy" : "lazy",
+ "cloud.storage.debug.mode.enabled" : false,
+ "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+ "cloud.storage.disk.monitor.interval" : 60,
"cloud.storage.endpoint" : "",
+ "cloud.storage.index.inactive.duration.threshold" : 6,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
+ "cloud.storage.sweep.threshold.percentage" : 0.9,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index a47b3be23b..aff2c778fe 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -11,13 +11,19 @@
"azure.request.timeout" : 120,
"cloud.deployment" : false,
"cloud.profiler.log.interval" : 0,
+ "cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
"cloud.storage.cache.policy" : "lazy",
+ "cloud.storage.debug.mode.enabled" : false,
+ "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+ "cloud.storage.disk.monitor.interval" : 60,
"cloud.storage.endpoint" : "",
+ "cloud.storage.index.inactive.duration.threshold" : 6,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
+ "cloud.storage.sweep.threshold.percentage" : 0.9,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
new file mode 100644
index 0000000000..a0c1fbbd26
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.asterix.common.api.INamespacePathResolver;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
+import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
+import
org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
+import
org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
+import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class CloudConfigurator {
+ private final CloudProperties cloudProperties;
+ private final IOManager localIoManager;
+ private final AbstractCloudIOManager cloudIOManager;
+ private final IPhysicalDrive physicalDrive;
+ private final IDiskResourceCacheLockNotifier lockNotifier;
+ private final IDiskCachedPageAllocator pageAllocator;
+ private final IBufferCacheReadContext defaultContext;
+ private final boolean diskCacheManagerRequired;
+ private final long diskCacheMonitoringInterval;
+
+ private CloudConfigurator(CloudProperties cloudProperties, IIOManager
ioManager,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ this.cloudProperties = cloudProperties;
+ localIoManager = (IOManager) ioManager;
+ diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() ==
CloudCachePolicy.SELECTIVE;
+ cloudIOManager = createIOManager(ioManager, cloudProperties,
nsPathResolver);
+ physicalDrive = createPhysicalDrive(diskCacheManagerRequired,
cloudProperties, ioManager);
+ lockNotifier = createLockNotifier(diskCacheManagerRequired);
+ pageAllocator = createPageAllocator(diskCacheManagerRequired);
+ defaultContext =
createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
+ diskCacheMonitoringInterval =
cloudProperties.getStorageDiskMonitorInterval();
+ }
+
+ public IPartitionBootstrapper getPartitionBootstrapper() {
+ return cloudIOManager;
+ }
+
+ public IIOManager getCloudIoManager() {
+ return cloudIOManager;
+ }
+
+ public IDiskResourceCacheLockNotifier getLockNotifier() {
+ return lockNotifier;
+ }
+
+ public IDiskCachedPageAllocator getPageAllocator() {
+ return pageAllocator;
+ }
+
+ public IBufferCacheReadContext getDefaultContext() {
+ return defaultContext;
+ }
+
+ public IDiskCacheMonitoringService
createDiskCacheMonitoringService(INCServiceContext serviceContext,
+ IBufferCache bufferCache, Map<Integer, BufferedFileHandle>
fileInfoMap) {
+ if (!diskCacheManagerRequired) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
+ CloudDiskResourceCacheLockNotifier resourceCacheManager =
(CloudDiskResourceCacheLockNotifier) lockNotifier;
+ BufferCache diskBufferCache = (BufferCache) bufferCache;
+ int numOfIoDevices = localIoManager.getIODevices().size();
+ IApplicationConfig appConfig = serviceContext.getAppConfig();
+ int ioParallelism =
appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
+ int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
+ int numOfSweepThreads = ioParallelism * numOfIoDevices;
+ // Ensure at least each sweep thread has one entry in the queue
+ int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
+ long inactiveThreshold =
cloudProperties.getStorageIndexInactiveDurationThreshold();
+ // +1 for the monitorThread
+ ExecutorService executor =
Executors.newFixedThreadPool(numOfSweepThreads + 1);
+ DiskCacheSweeperThread monitorThread = new
DiskCacheSweeperThread(executor, diskCacheMonitoringInterval,
+ resourceCacheManager, cloudIOManager, numOfSweepThreads,
maxSweepQueueSize, physicalDrive,
+ diskBufferCache, fileInfoMap, inactiveThreshold);
+
+ IDiskCacheMonitoringService diskCacheService =
+ new CloudDiskCacheMonitoringAndPrefetchingService(executor,
physicalDrive, monitorThread);
+ localIoManager.setSpaceMaker(monitorThread);
+ return diskCacheService;
+ }
+
+ public static CloudConfigurator of(CloudProperties cloudProperties,
IIOManager ioManager,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ return new CloudConfigurator(cloudProperties, ioManager,
nsPathResolver);
+ }
+
+ public static AbstractCloudIOManager createIOManager(IIOManager ioManager,
CloudProperties cloudProperties,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ IOManager localIoManager = (IOManager) ioManager;
+ CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
+ if (policy == CloudCachePolicy.EAGER) {
+ return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver);
+ }
+
+ boolean selective = policy == CloudCachePolicy.SELECTIVE;
+ return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, selective);
+ }
+
+ private static IPhysicalDrive createPhysicalDrive(boolean
diskCacheManagerRequired, CloudProperties cloudProperties,
+ IIOManager ioManager) throws HyracksDataException {
+ if (diskCacheManagerRequired) {
+ double storagePercentage =
cloudProperties.getStorageAllocationPercentage();
+ double pressureThreshold =
cloudProperties.getStorageSweepThresholdPercentage();
+ long pressureDebugSize =
cloudProperties.getStorageDebugSweepThresholdSize();
+ return new PhysicalDrive(ioManager.getIODevices(),
pressureThreshold, storagePercentage, pressureDebugSize);
+ }
+
+ return DummyPhysicalDrive.INSTANCE;
+ }
+
+ private static IDiskResourceCacheLockNotifier createLockNotifier(boolean
diskCacheManagerRequired) {
+ if (diskCacheManagerRequired) {
+ return new
CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+ }
+
+ return NoOpDiskResourceCacheLockNotifier.INSTANCE;
+ }
+
+ private static IDiskCachedPageAllocator createPageAllocator(boolean
diskCacheManagerRequired) {
+ if (diskCacheManagerRequired) {
+ return CloudDiskCachedPageAllocator.INSTANCE;
+ }
+ return DefaultDiskCachedPageAllocator.INSTANCE;
+ }
+
+ private static IBufferCacheReadContext
createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
+ IPhysicalDrive drive) {
+ if (diskCacheManagerRequired) {
+ return new DefaultCloudReadContext(drive);
+ }
+
+ return DefaultBufferCacheReadContextProvider.DEFAULT;
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 04979e6810..c993cd31e6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -102,7 +102,7 @@ final class EagerCloudIOManager extends
AbstractCloudIOManager {
}
@Override
- public void evict(FileReference directory) {
+ public void evict(String resourcePath) {
throw new UnsupportedOperationException("evict is not supported with
Eager caching");
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 612237a96b..afe08780ac 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -68,11 +68,11 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties,
- INamespacePathResolver nsPathResolver, boolean
replaceableAccessor) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, boolean selective) throws
HyracksDataException {
super(ioManager, cloudProperties, nsPathResolver);
accessor = new InitialCloudAccessor(cloudClient, bucket,
localIoManager);
puncher = HolePuncherProvider.get(this, cloudProperties,
writeBufferProvider);
- if (replaceableAccessor) {
+ if (selective) {
replacer = InitialCloudAccessor.NO_OP_REPLACER;
} else {
replacer = () -> {
@@ -208,8 +208,8 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
}
@Override
- public void evict(FileReference directory) throws HyracksDataException {
- accessor.doEvict(directory);
+ public void evict(String resourcePath) throws HyracksDataException {
+ accessor.doEvict(resolve(resourcePath));
}
private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws
HyracksDataException {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 7539aa731d..a3079e6da5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -180,8 +180,8 @@ public final class ParallelCacher implements
IParallelCacher {
@Override
public synchronized void add(Collection<FileReference> files) {
LOGGER.info("Uncache {}", files);
+ // We only can 'uncache' data files
uncachedDataFiles.putAll(getFiles(files, DATA_FILTER));
- uncachedMetadataFiles.putAll(getFiles(files, METADATA_FILTER));
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 934468aab8..218015d5d4 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.lazy.accessor;
+import static org.apache.asterix.cloud.util.CloudFileUtil.DATA_FILTER;
+
import java.util.Collection;
import java.util.Set;
@@ -50,10 +52,21 @@ public class SelectiveCloudAccessor extends
ReplaceableCloudAccessor {
throw new IllegalStateException(directory + " is not a directory");
}
- // TODO only delete data files?
- Collection<FileReference> uncachedFiles =
UncachedFileReference.toUncached(localIoManager.list(directory));
+ // Get a list of all data files
+ Collection<FileReference> files = localIoManager.list(directory,
DATA_FILTER);
+ if (files.isEmpty()) {
+ // Nothing to evict
+ return;
+ }
+
+ // Convert file references to uncached ones
+ Collection<FileReference> uncachedFiles =
UncachedFileReference.toUncached(files);
+ // Add all data files to the cacher to indicate they are in a
'cacheable' state (i.e., not downloaded)
cacher.add(uncachedFiles);
- localIoManager.delete(directory);
+ // Delete all data files from the local drive
+ for (FileReference uncachedFile : uncachedFiles) {
+ localIoManager.delete(uncachedFile);
+ }
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index 4ea6c464e3..406f2f30b5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -18,17 +18,21 @@
*/
package org.apache.asterix.cloud.lazy.filesystem;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.common.cloud.CloudCachePolicy;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
public final class HolePuncherProvider {
private static final IHolePuncher UNSUPPORTED =
HolePuncherProvider::unsupported;
+ private static final IHolePuncher LINUX =
HolePuncherProvider::linuxPunchHole;
private HolePuncherProvider() {
}
@@ -39,13 +43,35 @@ public final class HolePuncherProvider {
return UNSUPPORTED;
}
- return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ if (FileSystemOperationDispatcherUtil.isLinux()) {
+ return LINUX;
+ } else if (cloudProperties.isStorageDebugModeEnabled()) {
+ // Running on debug mode on a non-Linux box
+ return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ }
+
+ throw new UnsupportedOperationException(
+ "Hole puncher is not supported using " +
FileSystemOperationDispatcherUtil.getOSName());
}
private static int unsupported(IFileHandle fileHandle, long offset, long
length) {
throw new UnsupportedOperationException("punchHole is not supported");
}
+ private static int linuxPunchHole(IFileHandle fileHandle, long offset,
long length) throws HyracksDataException {
+ CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
+ int fileDescriptor = cloudFileHandle.getFileDescriptor();
+ int blockSize = cloudFileHandle.getBlockSize();
+ int freedSpace =
FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length,
blockSize);
+ try {
+ cloudFileHandle.getFileChannel().force(false);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ return freedSpace;
+ }
+
private static final class DebugHolePuncher implements IHolePuncher {
private final AbstractCloudIOManager cloudIOManager;
private final IWriteBufferProvider bufferProvider;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8e6becf672..888cea1186 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.util.cache.ICacheManager;
@@ -152,4 +153,9 @@ public interface INcApplicationContext extends
IApplicationContext {
* @return the disk write rate limiter provider
*/
IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+ /**
+ * @return disk cache service
+ */
+ IDiskCacheMonitoringService getDiskCacheService();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b7649a86e..5c23f5cce4 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,12 @@
package org.apache.asterix.common.config;
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
+import static
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;
import java.util.concurrent.TimeUnit;
@@ -28,6 +32,7 @@ import org.apache.asterix.common.cloud.CloudCachePolicy;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.StorageUtil;
public class CloudProperties extends AbstractProperties {
@@ -43,6 +48,14 @@ public class CloudProperties extends AbstractProperties {
CLOUD_STORAGE_ENDPOINT(STRING, ""),
CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+ // 80% of the total disk space
+ CLOUD_STORAGE_ALLOCATION_PERCENTAGE(DOUBLE, 0.8d),
+ // 90% of the allocated space for storage (i.e., 90% of the 80% of the
total disk space)
+ CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE(DOUBLE, 0.9d),
+ CLOUD_STORAGE_DISK_MONITOR_INTERVAL(POSITIVE_INTEGER, 60),
+ CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 6),
+ CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
+ CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT,
StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
private final IOptionType interpreter;
@@ -63,6 +76,12 @@ public class CloudProperties extends AbstractProperties {
case CLOUD_STORAGE_ENDPOINT:
case CLOUD_STORAGE_ANONYMOUS_AUTH:
case CLOUD_STORAGE_CACHE_POLICY:
+ case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+ case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+ case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+ case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+ case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+ case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
case CLOUD_PROFILER_LOG_INTERVAL:
return Section.COMMON;
default:
@@ -86,9 +105,36 @@ public class CloudProperties extends AbstractProperties {
case CLOUD_STORAGE_ANONYMOUS_AUTH:
return "Indicates whether or not anonymous auth should be
used for the cloud storage";
case CLOUD_STORAGE_CACHE_POLICY:
- return "The caching policy (either eager or lazy). 'Eager'
caching will download all partitions"
- + " upon booting, whereas lazy caching will
download a file upon request to open it."
+ return "The caching policy (either eager, lazy or
selective). 'eager' caching will download"
+ + "all partitions upon booting, whereas 'lazy'
caching will download a file upon"
+ + " request to open it. 'selective' caching will
act as the 'lazy' policy; however, "
+ + " it allows to use the local disk(s) as a cache,
where pages and indexes can be "
+ + " cached or evicted according to the pressure
imposed on the local disks."
+ " (default: 'lazy')";
+ case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+ return "The percentage of the total disk space that should
be allocated for data storage when the"
+ + " 'selective' caching policy is used. The
remaining will act as a buffer for "
+ + " query workspace (i.e., for query operations
that require spilling to disk)."
+ + " (default: 80% of the total disk space)";
+ case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+ return "The percentage of the used storage space at which
the disk sweeper starts freeing space by"
+ + " punching holes in stored indexes or by
evicting them entirely, "
+ + " when the 'selective' caching policy is used."
+ + " (default: 90% of the allocated space for
storage)";
+ case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+ return "The disk monitoring interval time (in seconds):
determines how often the system"
+ + " checks for pressure on disk space when using
the 'selective' caching policy."
+ + " (default : 60 seconds)";
+ case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+ return "The duration in minutes to consider an index is
inactive. (default: 360 or 6 hours)";
+ case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
+ return "Whether or not the debug mode is enabled when
using the 'selective' caching policy."
+ + "(default: false)";
+ case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+ return "For debugging only. Pressure size will be the
current used space + the additional bytes"
+ + " provided by this configuration option instead
of using "
+ + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
+ + " (default: 0. I.e.,
CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
case CLOUD_PROFILER_LOG_INTERVAL:
return "The waiting time (in minutes) to log cloud request
statistics (default: 0, which means"
+ " the profiler is disabled by default). The
minimum is 1 minute."
@@ -138,6 +184,31 @@ public class CloudProperties extends AbstractProperties {
return
CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
}
+ public double getStorageAllocationPercentage() {
+ return accessor.getDouble(Option.CLOUD_STORAGE_ALLOCATION_PERCENTAGE);
+ }
+
+ public double getStorageSweepThresholdPercentage() {
+ return
accessor.getDouble(Option.CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE);
+ }
+
+ public int getStorageDiskMonitorInterval() {
+ return accessor.getInt(Option.CLOUD_STORAGE_DISK_MONITOR_INTERVAL);
+ }
+
+ public long getStorageIndexInactiveDurationThreshold() {
+ int minutes =
accessor.getInt(Option.CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD);
+ return TimeUnit.MINUTES.toNanos(minutes);
+ }
+
+ public boolean isStorageDebugModeEnabled() {
+ return accessor.getBoolean(Option.CLOUD_STORAGE_DEBUG_MODE_ENABLED);
+ }
+
+ public long getStorageDebugSweepThresholdSize() {
+ return isStorageDebugModeEnabled() ?
accessor.getLong(Option.CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE) : 0L;
+ }
+
public long getProfilerLogInterval() {
long interval =
TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
return interval == 0 ? 0 : Math.max(interval,
TimeUnit.MINUTES.toNanos(1));
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a4515570a..07801a90a8 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -59,6 +59,7 @@ import
org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,6 +72,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
private final IVirtualBufferCache vbc;
private final ILogManager logManager;
private final LogRecord waitLog;
+ private final IDiskResourceCacheLockNotifier lockNotifier;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
// all LSM-trees share the same virtual buffer cache list
@@ -78,7 +80,8 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
public DatasetLifecycleManager(StorageProperties storageProperties,
ILocalResourceRepository resourceRepository,
ILogManager logManager, IVirtualBufferCache vbc,
- IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
int numPartitions) {
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+ IDiskResourceCacheLockNotifier lockNotifier) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
@@ -89,6 +92,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
vbcs.add(vbc);
}
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ this.lockNotifier = lockNotifier;
waitLog = new LogRecord();
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
@@ -122,6 +126,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
+ lockNotifier.onRegister(resource, index,
datasetResource.getIndexInfo(resource.getId()).getPartition());
}
private int getDIDfromResourcePath(String resourcePath) throws
HyracksDataException {
@@ -145,7 +150,6 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
-
DatasetResource dsr = datasets.get(did);
IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
@@ -153,6 +157,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST,
resourcePath);
}
+ lockNotifier.onUnregister(resourceID);
PrimaryIndexOperationTracker opTracker =
dsr.getOpTracker(iInfo.getPartition());
if (iInfo.getReferenceCount() != 0 || (opTracker != null &&
opTracker.getNumActiveOperations() != 0)) {
if (LOGGER.isErrorEnabled()) {
@@ -190,6 +195,9 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
+ // Notify first before opening a resource
+ lockNotifier.onOpen(resourceID);
+
DatasetResource dsr = datasets.get(did);
DatasetInfo dsInfo = dsr.getDatasetInfo();
if (dsInfo == null || !dsInfo.isRegistered()) {
@@ -253,6 +261,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
if (iInfo == null) {
throw
HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID,
resourceID);
}
+ lockNotifier.onClose(resourceID);
} finally {
// Regardless of what exception is thrown in the try-block (e.g.,
line 279),
// we have to un-touch the index and dataset.
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
index 04f594ee39..620417be19 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -30,6 +30,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProv
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import com.fasterxml.jackson.databind.JsonNode;
@@ -45,32 +46,32 @@ public class RuntimeComponentsProvider implements
IStorageManager, ILSMIOOperati
@Override
public ILSMIOOperationScheduler getIoScheduler(INCServiceContext ctx) {
- return ((INcApplicationContext)
ctx.getApplicationContext()).getLSMIOScheduler();
+ return getAppCtx(ctx).getLSMIOScheduler();
}
@Override
public IIOManager getIoManager(INCServiceContext ctx) {
- return ((INcApplicationContext)
ctx.getApplicationContext()).getPersistenceIoManager();
+ return getAppCtx(ctx).getPersistenceIoManager();
}
@Override
public IBufferCache getBufferCache(INCServiceContext ctx) {
- return ((INcApplicationContext)
ctx.getApplicationContext()).getBufferCache();
+ return getAppCtx(ctx).getBufferCache();
}
@Override
public ILocalResourceRepository
getLocalResourceRepository(INCServiceContext ctx) {
- return ((INcApplicationContext)
ctx.getApplicationContext()).getLocalResourceRepository();
+ return getAppCtx(ctx).getLocalResourceRepository();
}
@Override
public IDatasetLifecycleManager getLifecycleManager(INCServiceContext ctx)
{
- return ((INcApplicationContext)
ctx.getApplicationContext()).getDatasetLifecycleManager();
+ return getAppCtx(ctx).getDatasetLifecycleManager();
}
@Override
public IResourceIdFactory getResourceIdFactory(INCServiceContext ctx) {
- return ((INcApplicationContext)
ctx.getApplicationContext()).getResourceIdFactory();
+ return getAppCtx(ctx).getResourceIdFactory();
}
@Override
@@ -78,6 +79,16 @@ public class RuntimeComponentsProvider implements
IStorageManager, ILSMIOOperati
return registry.getClassIdentifier(getClass(), serialVersionUID);
}
+ @Override
+ public IDiskCacheMonitoringService
getDiskCacheMonitoringService(INCServiceContext ctx) {
+ return getAppCtx(ctx).getDiskCacheService();
+
+ }
+
+ private INcApplicationContext getAppCtx(INCServiceContext ctx) {
+ return ((INcApplicationContext) ctx.getApplicationContext());
+ }
+
@SuppressWarnings("squid:S1172") // unused parameter
public static IJsonSerializable fromJson(IPersistedResourceRegistry
registry, JsonNode json) {
return RUNTIME_PROVIDER;
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
index 381d5ebe30..ba513facd9 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -18,9 +18,11 @@
*/
package org.apache.hyracks.cloud.cache.service;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.disk.prefetch.AbstractPrefetchRequest;
@@ -54,6 +56,11 @@ public final class
CloudDiskCacheMonitoringAndPrefetchingService
return true;
}
+ @Override
+ public void reportLocalResources(Map<Long, LocalResource> localResources) {
+ monitorThread.reportLocalResources(localResources);
+ }
+
@Override
public IPhysicalDrive getPhysicalDrive() {
return drive;
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 55bab43087..036a812bb1 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -18,67 +18,151 @@
*/
package org.apache.hyracks.cloud.cache.service;
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hyracks.cloud.cache.unit.AbstractIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-// TODO locking should be revised
public final class CloudDiskResourceCacheLockNotifier implements
IDiskResourceCacheLockNotifier {
- private final Int2ObjectMap<DatasetUnit> datasets;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final int metadataPartition;
+ private final Long2ObjectMap<LocalResource> inactiveResources;
+ private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
+ private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
+ private final ReentrantReadWriteLock evictionLock;
- public CloudDiskResourceCacheLockNotifier() {
- datasets = Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+ public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
+ this.metadataPartition = metadataPartition;
+ inactiveResources = Long2ObjectMaps.synchronize(new
Long2ObjectOpenHashMap<>());
+ unsweepableIndexes = Long2ObjectMaps.synchronize(new
Long2ObjectOpenHashMap<>());
+ sweepableIndexes = Long2ObjectMaps.synchronize(new
Long2ObjectOpenHashMap<>());
+ evictionLock = new ReentrantReadWriteLock();
}
@Override
- public void onRegister(int datasetId, LocalResource localResource, IIndex
index) {
+ public void onRegister(LocalResource localResource, IIndex index, int
partition) {
ILSMIndex lsmIndex = (ILSMIndex) index;
- if (lsmIndex.getDiskCacheManager().isSweepable()) {
- DatasetUnit datasetUnit = datasets.computeIfAbsent(datasetId,
DatasetUnit::new);
- datasetUnit.addIndex(localResource.getId(), lsmIndex);
+ evictionLock.readLock().lock();
+ try {
+ if (partition != metadataPartition) {
+ long resourceId = localResource.getId();
+ if (lsmIndex.getDiskCacheManager().isSweepable()) {
+ sweepableIndexes.put(resourceId, new
SweepableIndexUnit(localResource, lsmIndex));
+ } else {
+ unsweepableIndexes.put(resourceId, new
UnsweepableIndexUnit(localResource));
+ }
+ }
+ inactiveResources.remove(localResource.getId());
+ } finally {
+ evictionLock.readLock().unlock();
}
}
@Override
- public void onUnregister(int datasetId, long resourceId) {
- DatasetUnit datasetUnit = datasets.get(datasetId);
- if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
- datasets.remove(datasetId);
+ public void onUnregister(long resourceId) {
+ evictionLock.readLock().lock();
+ try {
+ AbstractIndexUnit indexUnit = getUnit(resourceId);
+ if (indexUnit != null) {
+ indexUnit.drop();
+ } else {
+ inactiveResources.remove(resourceId);
+ }
+ } finally {
+ evictionLock.readLock().unlock();
+ }
+ }
- // TODO invalidate eviction plans if the disk is not pressured
+ private AbstractIndexUnit getUnit(long resourceId) {
+ AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+ if (indexUnit == null) {
+ indexUnit = unsweepableIndexes.get(resourceId);
}
+ return indexUnit;
}
@Override
- public void onOpen(int datasetId, long resourceId) {
- DatasetUnit datasetUnit = datasets.get(datasetId);
- if (datasetUnit != null) {
- IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
- if (indexUnit != null) {
- indexUnit.readLock();
+ public void onOpen(long resourceId) {
+ evictionLock.readLock().lock();
+ try {
+ AbstractIndexUnit indexUnit = getUnit(resourceId);
+ if (indexUnit == null) {
+ // Metadata resource
+ return;
}
+ indexUnit.open();
+ } finally {
+ evictionLock.readLock().unlock();
}
}
@Override
- public void onClose(int datasetId, long resourceId) {
- DatasetUnit datasetUnit = datasets.get(datasetId);
- if (datasetUnit != null) {
- IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
- if (indexUnit != null) {
- indexUnit.readUnlock();
+ public void onClose(long resourceId) {
+ evictionLock.readLock().lock();
+ try {
+ AbstractIndexUnit indexUnit = getUnit(resourceId);
+ if (indexUnit == null) {
+ // Metadata resource
+ return;
+ }
+ indexUnit.close();
+ } finally {
+ evictionLock.readLock().unlock();
+ }
+
+ }
+
+ ReentrantReadWriteLock getEvictionLock() {
+ return evictionLock;
+ }
+
+ void reportLocalResources(Map<Long, LocalResource> localResources) {
+ inactiveResources.clear();
+ // First check whatever we had already
+ for (LocalResource lr : localResources.values()) {
+ if (unsweepableIndexes.containsKey(lr.getId()) ||
sweepableIndexes.containsKey(lr.getId())) {
+ // We already have this resource
+ continue;
}
+
+ // Probably a new resource or an old resource that wasn't
registered before
+ inactiveResources.put(lr.getId(), lr);
}
+
+ removeUnassignedResources(unsweepableIndexes, localResources);
+ removeUnassignedResources(sweepableIndexes, localResources);
+
+ LOGGER.info("Retained active {unsweepable: {}, sweepable: {}} and
inactive: {}", unsweepableIndexes,
+ sweepableIndexes, inactiveResources.values().stream()
+ .map(x -> "(id: " + x.getId() + ", path: " +
x.getPath() + ")").toList());
+ }
+
+ private void removeUnassignedResources(Long2ObjectMap<?> indexes,
Map<Long, LocalResource> localResources) {
+ indexes.long2ObjectEntrySet().removeIf(x ->
!localResources.containsKey(x.getLongKey()));
+ }
+
+ Collection<LocalResource> getInactiveResources() {
+ return inactiveResources.values();
+ }
+
+ Collection<UnsweepableIndexUnit> getUnsweepableIndexes() {
+ return unsweepableIndexes.values();
}
- Int2ObjectMap<DatasetUnit> getDatasets() {
- return datasets;
+ void getSweepableIndexes(Collection<SweepableIndexUnit> indexes) {
+ indexes.addAll(sweepableIndexes.values());
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index 79b978e33d..f594e06784 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.cloud.cache.service;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -27,11 +28,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IDiskSpaceMaker;
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.cloud.sweeper.ISweeper;
import org.apache.hyracks.cloud.sweeper.Sweeper;
+import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -44,21 +46,29 @@ public class DiskCacheSweeperThread implements Runnable,
IDiskSpaceMaker {
private final long waitTime;
private final CloudDiskResourceCacheLockNotifier resourceManager;
private final IPhysicalDrive physicalDrive;
- private final List<IndexUnit> indexes;
+ private final List<SweepableIndexUnit> indexes;
+ private final ICloudIOManager cloudIOManager;
private final ISweeper sweeper;
+ private final long inactiveTimeThreshold;
public DiskCacheSweeperThread(ExecutorService executorService, long
waitTime,
CloudDiskResourceCacheLockNotifier resourceManager,
ICloudIOManager cloudIOManager, int numOfSweepThreads,
int sweepQueueSize, IPhysicalDrive physicalDrive, BufferCache
bufferCache,
- Map<Integer, BufferedFileHandle> fileInfoMap) {
+ Map<Integer, BufferedFileHandle> fileInfoMap, long
inactiveTimeThreshold) {
this.waitTime = TimeUnit.SECONDS.toMillis(waitTime);
this.resourceManager = resourceManager;
this.physicalDrive = physicalDrive;
+ this.inactiveTimeThreshold = inactiveTimeThreshold;
indexes = new ArrayList<>();
+ this.cloudIOManager = cloudIOManager;
sweeper = new Sweeper(executorService, cloudIOManager, bufferCache,
fileInfoMap, numOfSweepThreads,
sweepQueueSize);
}
+ public void reportLocalResources(Map<Long, LocalResource> localResources) {
+ resourceManager.reportLocalResources(localResources);
+ }
+
@Override
public void makeSpaceOrThrow(IOException ioException) throws
HyracksDataException {
if (ioException.getMessage().contains("no space")) {
@@ -83,7 +93,7 @@ public class DiskCacheSweeperThread implements Runnable,
IDiskSpaceMaker {
while (true) {
synchronized (this) {
try {
- sweep();
+ makeSpace();
wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -94,20 +104,66 @@ public class DiskCacheSweeperThread implements Runnable,
IDiskSpaceMaker {
}
}
- private void sweep() {
+ private void makeSpace() {
if (physicalDrive.computeAndCheckIsPressured()) {
- for (DatasetUnit dataset : resourceManager.getDatasets().values())
{
- indexes.clear();
- dataset.getIndexes(indexes);
- sweepIndexes(sweeper, indexes);
+ boolean shouldSweep;
+ resourceManager.getEvictionLock().writeLock().lock();
+ try {
+ shouldSweep = evictInactive();
+ } finally {
+ resourceManager.getEvictionLock().writeLock().unlock();
+ }
+
+ if (shouldSweep) {
+ // index eviction didn't help. Sweep!
+ sweep();
+ }
+ }
+ }
+
+ private boolean evictInactive() {
+ long now = System.nanoTime();
+ Collection<LocalResource> inactiveResources =
resourceManager.getInactiveResources();
+ Collection<UnsweepableIndexUnit> unsweepableIndexes =
resourceManager.getUnsweepableIndexes();
+ if (inactiveResources.isEmpty() && unsweepableIndexes.isEmpty()) {
+ // return true to run sweep as nothing will be evicted
+ return true;
+ }
+
+ // First evict all resources that were never been registered
+ for (LocalResource resource : inactiveResources) {
+ try {
+ cloudIOManager.evict(resource.getPath());
+ } catch (HyracksDataException e) {
+ LOGGER.error("Failed to evict resource " + resource.getPath(),
e);
+ }
+ }
+
+ // Next evict all inactive indexes
+ for (UnsweepableIndexUnit index : unsweepableIndexes) {
+ if (now - index.getLastAccessTime() >= inactiveTimeThreshold) {
+ try {
+ cloudIOManager.evict(index.getPath());
+ } catch (HyracksDataException e) {
+ LOGGER.error("Failed to evict resource " +
index.getPath(), e);
+ }
}
}
+
+ // If disk is still pressured, proceed with sweep
+ return physicalDrive.computeAndCheckIsPressured();
+ }
+
+ private void sweep() {
+ indexes.clear();
+ resourceManager.getSweepableIndexes(indexes);
+ sweepIndexes(sweeper, indexes);
}
@CriticalPath
- private static void sweepIndexes(ISweeper sweeper, List<IndexUnit>
indexes) {
+ private static void sweepIndexes(ISweeper sweeper,
List<SweepableIndexUnit> indexes) {
for (int i = 0; i < indexes.size(); i++) {
- IndexUnit index = indexes.get(i);
+ SweepableIndexUnit index = indexes.get(i);
if (!index.isSweeping()) {
try {
sweeper.sweep(index);
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
new file mode 100644
index 0000000000..153234009e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
+public abstract class AbstractIndexUnit {
+ protected final LocalResource localResource;
+ private final AtomicLong lastAccessTime;
+ private final AtomicInteger openCounter;
+
+ AbstractIndexUnit(LocalResource localResource) {
+ this.localResource = localResource;
+ this.lastAccessTime = new AtomicLong(0);
+ this.openCounter = new AtomicInteger(0);
+ }
+
+ public final void open() {
+ lastAccessTime.set(System.nanoTime());
+ openCounter.get();
+ }
+
+ public final void close() {
+ openCounter.decrementAndGet();
+ }
+
+ public final long getLastAccessTime() {
+ return lastAccessTime.get();
+ }
+
+ public abstract void drop();
+
+ @Override
+ public String toString() {
+ return "(id: " + localResource.getId() + ", path: " +
localResource.getPath() + "sweepable: " + isSweepable()
+ + ")";
+ }
+
+ protected abstract boolean isSweepable();
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
deleted file mode 100644
index 34b37fbff9..0000000000
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.cache.unit;
-
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-
-public final class DatasetUnit {
- private final int id;
- private final ReentrantReadWriteLock lock;
- /**
- * Maps resourceId to {@link IndexUnit}
- */
- private final Long2ObjectMap<IndexUnit> indexes;
-
- public DatasetUnit(int datasetId) {
- id = datasetId;
- lock = new ReentrantReadWriteLock();
- indexes = new Long2ObjectOpenHashMap<>();
-
- }
-
- public int getId() {
- return id;
- }
-
- public IndexUnit addIndex(long resourceId, ILSMIndex index) {
- writeLock();
- try {
- IndexUnit indexUnit = new IndexUnit(resourceId, index);
- indexes.put(resourceId, indexUnit);
- return indexUnit;
- } finally {
- writeUnlock();
- }
- }
-
- public boolean dropIndex(long resourceId) {
- IndexUnit indexUnit = indexes.remove(resourceId);
- // Signal that the index is being dropped so a sweeper thread does not
sweep this index or stops sweeping
- indexUnit.setDropped();
- // Wait for the sweep operation (if running) before allowing the index
to be dropped
- indexUnit.waitForSweep();
- return indexUnit.getIndex().isPrimaryIndex();
- }
-
- public IndexUnit getIndex(long resourceId) {
- readLock();
- try {
- return indexes.get(resourceId);
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Return the current indexes
- *
- * @param indexUnits container used to return the current indexes
- */
- public void getIndexes(List<IndexUnit> indexUnits) {
- readLock();
- try {
- indexUnits.addAll(indexes.values());
- } finally {
- readUnlock();
- }
- }
-
- private void readLock() {
- lock.readLock().lock();
- }
-
- private void readUnlock() {
- lock.readLock().unlock();
- }
-
- private void writeLock() {
- lock.writeLock().lock();
- }
-
- private void writeUnlock() {
- lock.writeLock().unlock();
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
similarity index 77%
rename from
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
rename to
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
index 8f8b412604..794cafc6ff 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
@@ -19,37 +19,38 @@
package org.apache.hyracks.cloud.cache.unit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
-// TODO allow evicting an index entirely
-public final class IndexUnit {
- private final long id;
+public final class SweepableIndexUnit extends AbstractIndexUnit {
private final ILSMIndex index;
private final AtomicBoolean dropped;
private final AtomicBoolean sweeping;
- private final AtomicInteger readCounter;
- public IndexUnit(long resourceId, ILSMIndex index) {
- this.id = resourceId;
+ public SweepableIndexUnit(LocalResource localResource, ILSMIndex index) {
+ super(localResource);
this.index = index;
dropped = new AtomicBoolean(false);
sweeping = new AtomicBoolean(false);
- readCounter = new AtomicInteger(0);
}
- public long getId() {
- return id;
+ @Override
+ public void drop() {
+ // Signal that the index is being dropped so a sweeper thread does not
sweep this index or stops sweeping
+ dropped.set(false);
+ // Wait for the sweep operation (if running) before allowing the index
to be dropped
+ waitForSweep();
}
- public ILSMIndex getIndex() {
- return index;
+ @Override
+ protected boolean isSweepable() {
+ return true;
}
- public void setDropped() {
- dropped.set(false);
+ public ILSMIndex getIndex() {
+ return index;
}
public boolean isDropped() {
@@ -79,13 +80,4 @@ public final class IndexUnit {
sweeping.notifyAll();
}
}
-
- public void readLock() {
- readCounter.incrementAndGet();
- }
-
- public void readUnlock() {
- readCounter.decrementAndGet();
- }
-
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
similarity index 65%
copy from
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
copy to
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
index 0410e3bf13..204fe68c7a 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
@@ -16,18 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.cloud.cache.unit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.storage.common.LocalResource;
-public final class NoOpSweeper implements ISweeper {
- public static final ISweeper INSTANCE = new NoOpSweeper();
-
- private NoOpSweeper() {
+public final class UnsweepableIndexUnit extends AbstractIndexUnit {
+ public UnsweepableIndexUnit(LocalResource localResource) {
+ super(localResource);
}
@Override
- public void sweep(IndexUnit indexUnit) {
+ public void drop() {
// NoOp
}
+
+ @Override
+ protected boolean isSweepable() {
+ return false;
+ }
+
+ public String getPath() {
+ return localResource.getPath();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 83b7629b6a..0dca41765b 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
/**
@@ -86,9 +85,9 @@ public interface ICloudIOManager {
int punchHole(IFileHandle fHandle, long offset, long length) throws
HyracksDataException;
/**
- * Evict a directory from the local disk cache
+ * Evict a resource from the local disk cache
*
- * @param directory to evict
+ * @param resourcePath to evict
*/
- void evict(FileReference directory) throws HyracksDataException;
+ void evict(String resourcePath) throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
index d064fda10f..9067a3fba6 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.cloud.sweeper;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
/**
@@ -31,5 +31,5 @@ public interface ISweeper {
*
* @param indexUnit to sweep
*/
- void sweep(IndexUnit indexUnit) throws InterruptedException;
+ void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
index 0410e3bf13..ca103abd15 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.cloud.sweeper;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
public final class NoOpSweeper implements ISweeper {
public static final ISweeper INSTANCE = new NoOpSweeper();
@@ -27,7 +27,7 @@ public final class NoOpSweeper implements ISweeper {
}
@Override
- public void sweep(IndexUnit indexUnit) {
+ public void sweep(SweepableIndexUnit indexUnit) {
// NoOp
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index 79fdb5a5b2..c709cc6f78 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -36,7 +36,7 @@ public final class SweepContext implements ISweepContext {
private final BufferCache bufferCache;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final AtomicBoolean shutdown;
- private IndexUnit indexUnit;
+ private SweepableIndexUnit indexUnit;
private BufferedFileHandle handle;
public SweepContext(ICloudIOManager cloudIOManager, BufferCache
bufferCache,
@@ -74,11 +74,11 @@ public final class SweepContext implements ISweepContext {
bufferCache.unpin(page, bcOpCtx);
}
- public void setIndexUnit(IndexUnit indexUnit) {
+ public void setIndexUnit(SweepableIndexUnit indexUnit) {
this.indexUnit = indexUnit;
}
- public IndexUnit getIndexUnit() {
+ public SweepableIndexUnit getIndexUnit() {
return indexUnit;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index e5d368c8fc..245c95793b 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.InvokeUtil;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
@@ -56,7 +56,7 @@ public final class Sweeper implements ISweeper {
}
@Override
- public void sweep(IndexUnit indexUnit) throws InterruptedException {
+ public void sweep(SweepableIndexUnit indexUnit) throws
InterruptedException {
SweepRequest request = freeRequests.take();
request.reset(indexUnit);
requests.put(request);
@@ -131,7 +131,7 @@ public final class Sweeper implements ISweeper {
this.context = context;
}
- void reset(IndexUnit indexUnit) {
+ void reset(SweepableIndexUnit indexUnit) {
context.setIndexUnit(indexUnit);
}
@@ -151,7 +151,7 @@ public final class Sweeper implements ISweeper {
*/
return;
}
- IndexUnit indexUnit = context.getIndexUnit();
+ SweepableIndexUnit indexUnit = context.getIndexUnit();
indexUnit.startSweeping();
try {
ILSMIndex index = indexUnit.getIndex();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b9f277abf3..9909e97a5d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -45,6 +45,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
@@ -82,6 +83,8 @@ public class IOManager implements IIOManager {
* Mutables
*/
private int workspaceIndex;
+ // TODO use space make on write
+ private IDiskSpaceMaker spaceMaker;
public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver
deviceComputer, int ioParallelism, int queueSize)
throws HyracksDataException {
@@ -112,6 +115,7 @@ public class IOManager implements IIOManager {
for (int i = 0; i < numIoThreads; i++) {
executor.execute(new IoRequestHandler(i, submittedRequests));
}
+ spaceMaker = NoOpDiskSpaceMaker.INSTANCE;
}
public int getQueueSize() {
@@ -596,4 +600,8 @@ public class IOManager implements IIOManager {
public void performBulkOperation(IIOBulkOperation bulkOperation) throws
HyracksDataException {
((AbstractBulkOperation) bulkOperation).performOperation();
}
+
+ public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
+ this.spaceMaker = spaceMaker;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
similarity index 63%
copy from
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
copy to
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
index 0410e3bf13..1527dde053 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
@@ -16,18 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.control.nc.io;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import java.io.IOException;
-public final class NoOpSweeper implements ISweeper {
- public static final ISweeper INSTANCE = new NoOpSweeper();
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
- private NoOpSweeper() {
+final class NoOpDiskSpaceMaker implements IDiskSpaceMaker {
+ static final IDiskSpaceMaker INSTANCE = new NoOpDiskSpaceMaker();
+
+ private NoOpDiskSpaceMaker() {
}
@Override
- public void sweep(IndexUnit indexUnit) {
- // NoOp
+ public void makeSpaceOrThrow(IOException e) throws HyracksDataException {
+ throw HyracksDataException.create(e);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
index bf870b1756..426e81db8f 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
@@ -26,6 +26,8 @@ import
org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public class BTreeHelperStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@ public class BTreeHelperStorageManager implements
IStorageManager {
public IResourceLifecycleManager<IIndex>
getLifecycleManager(INCServiceContext ctx) {
return RuntimeContext.get(ctx).getIndexLifecycleManager();
}
+
+ @Override
+ public IDiskCacheMonitoringService
getDiskCacheMonitoringService(INCServiceContext ctx) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index 4932bf0c7f..9fc3b8d064 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.sweeper.SweepContext;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
@@ -59,7 +59,7 @@ public final class ColumnSweeper {
public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector
sweepProjector)
throws HyracksDataException {
- IndexUnit indexUnit = context.getIndexUnit();
+ SweepableIndexUnit indexUnit = context.getIndexUnit();
LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
IColumnProjectionInfo projectionInfo =
captureSweepableComponents(lsmColumnBTree, sweepProjector);
if (projectionInfo == null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index a0b592a5e1..7ea4e353a8 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -45,6 +45,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import
org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import org.apache.hyracks.storage.common.IStorageManager;
import
org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -87,13 +88,14 @@ public class LSMColumnBTreeLocalResource extends
LSMBTreeLocalResource {
List<IVirtualBufferCache> vbcs =
vbcProvider.getVirtualBufferCaches(serviceCtx, file);
ioOpCallbackFactory.initialize(serviceCtx, this);
pageWriteCallbackFactory.initialize(serviceCtx, this);
+ IDiskCacheMonitoringService diskCacheService =
storageManager.getDiskCacheMonitoringService(serviceCtx);
return LSMColumnBTreeUtil.createLSMTree(ioManager, vbcs, file,
storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterKeyFields,
bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
serviceCtx),
opTrackerProvider.getOperationTracker(serviceCtx, this),
ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields,
metadataPageManagerFactory, false,
serviceCtx.getTracer(), compressorDecompressorFactory,
nullTypeTraits, nullIntrospector,
- columnManagerFactory, atomic);
+ columnManagerFactory, atomic, diskCacheService);
}
public static IJsonSerializable fromJson(IPersistedResourceRegistry
registry, JsonNode json)
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 039aaedbf3..6335acd21a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -34,6 +34,7 @@ import
org.apache.hyracks.storage.am.common.api.INullIntrospector;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.CloudColumnIndexDiskCacheManager;
import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.NoOpColumnIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
@@ -55,6 +56,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.util.trace.ITracer;
public class LSMColumnBTreeUtil {
@@ -66,13 +68,18 @@ public class LSMColumnBTreeUtil {
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
int[] btreeFields, IMetadataPageManagerFactory
freePageManagerFactory, boolean updateAware, ITracer tracer,
ICompressorDecompressorFactory compressorDecompressorFactory,
ITypeTraits nullTypeTraits,
- INullIntrospector nullIntrospector, IColumnManagerFactory
columnManagerFactory, boolean atomic)
- throws HyracksDataException {
+ INullIntrospector nullIntrospector, IColumnManagerFactory
columnManagerFactory, boolean atomic,
+ IDiskCacheMonitoringService diskCacheService) throws
HyracksDataException {
+
// Initialize managers
+ boolean diskCacheEnabled = diskCacheService.isEnabled();
IColumnManager columnManager =
columnManagerFactory.createColumnManager();
- IColumnIndexDiskCacheManager diskCacheManager =
NoOpColumnIndexDiskCacheManager.INSTANCE;
+ IColumnIndexDiskCacheManager diskCacheManager = diskCacheEnabled
+ ? new
CloudColumnIndexDiskCacheManager(columnManager.getNumberOfPrimaryKeys(),
+ columnManager.getMergeColumnProjector(),
diskCacheService.getPhysicalDrive())
+ : NoOpColumnIndexDiskCacheManager.INSTANCE;
- //Tuple writers
+ // Tuple writers
LSMBTreeTupleWriterFactory insertTupleWriterFactory = new
LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false, updateAware, nullTypeTraits,
nullIntrospector);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new
LSMBTreeTupleWriterFactory(typeTraits,
@@ -82,7 +89,7 @@ public class LSMColumnBTreeUtil {
LSMBTreeTupleWriterFactory bulkLoadTupleWriterFactory = new
LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false, updateAware, nullTypeTraits,
nullIntrospector);
- //Leaf frames
+ // Leaf frames
ITreeIndexFrameFactory flushLeafFrameFactory = new
ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
columnManagerFactory.getFlushColumnTupleReaderWriterFactory());
ITreeIndexFrameFactory mergeLeafFrameFactory = new
ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
@@ -93,7 +100,7 @@ public class LSMColumnBTreeUtil {
ITreeIndexFrameFactory deleteLeafFrameFactory = new
BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
ITreeIndexFrameFactory interiorFrameFactory = new
BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
- //BTree factory
+ // BTree factory
TreeIndexFactory<ColumnBTree> flushBTreeFactory = new
ColumnBTreeFactory(ioManager, diskBufferCache,
freePageManagerFactory, interiorFrameFactory,
flushLeafFrameFactory, cmpFactories, typeTraits.length);
TreeIndexFactory<ColumnBTree> mergeBTreeFactory = new
ColumnBTreeFactory(ioManager, diskBufferCache,
@@ -102,8 +109,8 @@ public class LSMColumnBTreeUtil {
new ColumnBTreeFactory(ioManager, diskBufferCache,
freePageManagerFactory, interiorFrameFactory,
bulkLoadLeafFrameFactory, cmpFactories,
typeTraits.length);
- ILSMIndexFileManager fileNameManager =
- new LSMBTreeFileManager(ioManager, file, flushBTreeFactory,
true, compressorDecompressorFactory);
+ ILSMIndexFileManager fileNameManager = new
LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true,
+ compressorDecompressorFactory, diskCacheEnabled);
BloomFilterFactory bloomFilterFactory = new
BloomFilterFactory(diskBufferCache, bloomFilterKeyFields);
ILSMDiskComponentFactory flushComponentFactory =
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 98b82db8be..e2e49e3d47 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -45,18 +45,20 @@ public class LSMBTreeFileManager extends
AbstractLSMIndexFileManager {
(dir, name) -> !name.startsWith(".") &&
name.endsWith(BTREE_SUFFIX);
private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
private final boolean hasBloomFilter;
+ private final boolean allowHoles;
+
+ public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean
hasBloomFilter) {
+ this(ioManager, file, btreeFactory, hasBloomFilter,
NoOpCompressorDecompressorFactory.INSTANCE, false);
+ }
public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean
hasBloomFilter,
- ICompressorDecompressorFactory compressorDecompressorFactory) {
+ ICompressorDecompressorFactory compressorDecompressorFactory,
boolean allowHoles) {
super(ioManager, file, null, compressorDecompressorFactory);
this.btreeFactory = btreeFactory;
this.hasBloomFilter = hasBloomFilter;
- }
-
- public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
- TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean
hasBloomFilter) {
- this(ioManager, file, btreeFactory, hasBloomFilter,
NoOpCompressorDecompressorFactory.INSTANCE);
+ this.allowHoles = allowHoles;
}
@Override
@@ -180,4 +182,9 @@ public class LSMBTreeFileManager extends
AbstractLSMIndexFileManager {
return validFiles;
}
+
+ @Override
+ protected boolean areHolesAllowed() {
+ return allowHoles;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index 3106b2924c..6bd236c191 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -122,7 +122,7 @@ public class LSMBTreeUtil {
filterManager = new LSMComponentFilterManager(filterFrameFactory);
}
ILSMIndexFileManager fileNameManager = new
LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
- hasBloomFilter, compressorDecompressorFactory);
+ hasBloomFilter, compressorDecompressorFactory, false);
ILSMDiskComponentFactory componentFactory;
ILSMDiskComponentFactory bulkLoadComponentFactory;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
index 9d0880e9fa..47492e8dab 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
/**
@@ -59,4 +60,10 @@ public interface IStorageManager extends Serializable,
IJsonSerializable {
* @return the resource lifecycle manager
*/
IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext
ctx);
+
+ /**
+ * @param ctx the nc service context
+ * @return disk cache monitoring service
+ */
+ IDiskCacheMonitoringService
getDiskCacheMonitoringService(INCServiceContext ctx);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
index 5fb7088c67..b957782d14 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.common.disk;
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
/**
* Disk cache monitoring service is responsible for monitor the local drives
*/
@@ -37,6 +41,13 @@ public interface IDiskCacheMonitoringService {
*/
boolean isEnabled();
+ /**
+ * Report all local resources
+ *
+ * @param localResources local resources
+ */
+ void reportLocalResources(Map<Long, LocalResource> localResources);
+
/**
* @return physical drive
*/
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index fc4645d925..1e986b3e47 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -31,34 +31,31 @@ public interface IDiskResourceCacheLockNotifier {
* Notify registering a new resource
* Note: this method is not thread-safe outside {@link
org.apache.hyracks.storage.common.IResourceLifecycleManager}
*
- * @param datasetId dataset ID
* @param localResource resource to be registered
* @param index of the resource
+ * @param partition partition
*/
- void onRegister(int datasetId, LocalResource localResource, IIndex index);
+ void onRegister(LocalResource localResource, IIndex index, int partition);
/**
* Notify unregistering an existing resource
* Note: this method is not thread-safe outside {@link
org.apache.hyracks.storage.common.IResourceLifecycleManager}
*
- * @param datasetId dataset ID
* @param resourceId resource ID
*/
- void onUnregister(int datasetId, long resourceId);
+ void onUnregister(long resourceId);
/**
* Notify opening a resource
*
- * @param datasetId dataset ID
* @param resourceId resource ID
*/
- void onOpen(int datasetId, long resourceId);
+ void onOpen(long resourceId);
/**
* Notify closing a resource
*
- * @param datasetId dataset ID
* @param resourceId resource ID
*/
- void onClose(int datasetId, long resourceId);
+ void onClose(long resourceId);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
index 9a7a5d693c..d6155a4d0f 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.common.disk;
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
public final class NoOpDiskCacheMonitoringService implements
IDiskCacheMonitoringService {
public static final IDiskCacheMonitoringService INSTANCE = new
NoOpDiskCacheMonitoringService();
@@ -39,6 +43,11 @@ public final class NoOpDiskCacheMonitoringService implements
IDiskCacheMonitorin
return false;
}
+ @Override
+ public void reportLocalResources(Map<Long, LocalResource> localResources) {
+ // NoOp
+ }
+
@Override
public IPhysicalDrive getPhysicalDrive() {
return DummyPhysicalDrive.INSTANCE;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index 3c0d6c998a..b83c388466 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,22 +28,22 @@ public final class NoOpDiskResourceCacheLockNotifier
implements IDiskResourceCac
}
@Override
- public void onRegister(int datasetId, LocalResource localResource, IIndex
index) {
+ public void onRegister(LocalResource localResource, IIndex index, int
partition) {
// NoOp
}
@Override
- public void onUnregister(int datasetId, long resourceId) {
+ public void onUnregister(long resourceId) {
// NoOp
}
@Override
- public void onOpen(int datasetId, long resourceId) {
+ public void onOpen(long resourceId) {
// NoOp
}
@Override
- public void onClose(int datasetId, long resourceId) {
+ public void onClose(long resourceId) {
// NoOp
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index e54ce2c399..0b4d2ed72b 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -26,6 +26,8 @@ import
org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public class TestStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@ public class TestStorageManager implements IStorageManager {
return TestStorageManagerComponentHolder.getIndexLifecycleManager();
}
+ @Override
+ public IDiskCacheMonitoringService
getDiskCacheMonitoringService(INCServiceContext ctx) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
}