This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 42c18e65fb [ASTERIXDB-3387][STO] Enable Selective caching policy
42c18e65fb is described below

commit 42c18e65fb44113f3bee8a3ce7ac38259967e4de
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed May 15 16:08:04 2024 -0700

    [ASTERIXDB-3387][STO] Enable Selective caching policy
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: no
    
    Details:
    Allow user to enable/select Selective caching policy
    
    Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  47 ++++--
 .../nc/task/CloudToLocalStorageCachingTask.java    |   2 +
 .../asterix/hyracks/bootstrap/CCApplication.java   |   5 +-
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   6 +
 .../cluster_state_1_full.1.regexadm                |   6 +
 .../cluster_state_1_less.1.regexadm                |   6 +
 .../apache/asterix/cloud/CloudConfigurator.java    | 180 +++++++++++++++++++++
 .../apache/asterix/cloud/EagerCloudIOManager.java  |   2 +-
 .../apache/asterix/cloud/LazyCloudIOManager.java   |   8 +-
 .../apache/asterix/cloud/lazy/ParallelCacher.java  |   2 +-
 .../lazy/accessor/SelectiveCloudAccessor.java      |  19 ++-
 .../cloud/lazy/filesystem/HolePuncherProvider.java |  28 +++-
 .../asterix/common/api/INcApplicationContext.java  |   6 +
 .../asterix/common/config/CloudProperties.java     |  75 ++++++++-
 .../common/context/DatasetLifecycleManager.java    |  13 +-
 .../runtime/utils/RuntimeComponentsProvider.java   |  23 ++-
 ...udDiskCacheMonitoringAndPrefetchingService.java |   7 +
 .../CloudDiskResourceCacheLockNotifier.java        | 148 +++++++++++++----
 .../cache/service/DiskCacheSweeperThread.java      |  80 +++++++--
 .../cloud/cache/unit/AbstractIndexUnit.java        |  59 +++++++
 .../hyracks/cloud/cache/unit/DatasetUnit.java      | 106 ------------
 .../{IndexUnit.java => SweepableIndexUnit.java}    |  38 ++---
 .../unit/UnsweepableIndexUnit.java}                |  22 ++-
 .../apache/hyracks/cloud/io/ICloudIOManager.java   |   7 +-
 .../org/apache/hyracks/cloud/sweeper/ISweeper.java |   4 +-
 .../apache/hyracks/cloud/sweeper/NoOpSweeper.java  |   4 +-
 .../apache/hyracks/cloud/sweeper/SweepContext.java |   8 +-
 .../org/apache/hyracks/cloud/sweeper/Sweeper.java  |   8 +-
 .../apache/hyracks/control/nc/io/IOManager.java    |   8 +
 .../hyracks/control/nc/io/NoOpDiskSpaceMaker.java} |  17 +-
 .../btree/helper/BTreeHelperStorageManager.java    |   7 +
 .../btree/column/cloud/sweep/ColumnSweeper.java    |   4 +-
 .../dataflow/LSMColumnBTreeLocalResource.java      |   4 +-
 .../lsm/btree/column/utils/LSMColumnBTreeUtil.java |  23 ++-
 .../am/lsm/btree/impls/LSMBTreeFileManager.java    |  19 ++-
 .../storage/am/lsm/btree/utils/LSMBTreeUtil.java   |   2 +-
 .../hyracks/storage/common/IStorageManager.java    |   7 +
 .../common/disk/IDiskCacheMonitoringService.java   |  11 ++
 .../disk/IDiskResourceCacheLockNotifier.java       |  13 +-
 .../disk/NoOpDiskCacheMonitoringService.java       |   9 ++
 .../disk/NoOpDiskResourceCacheLockNotifier.java    |   8 +-
 .../hyracks/test/support/TestStorageManager.java   |   7 +
 42 files changed, 793 insertions(+), 265 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index cd52b9dd90..1d6a0d8978 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.cloud.LocalPartitionBootstrapper;
 import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -122,6 +122,10 @@ import 
org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import 
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.FileMapManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -180,6 +184,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private IPartitionBootstrapper partitionBootstrapper;
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
+    private IDiskCacheMonitoringService diskCacheService;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, 
NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory, INamespaceResolver 
namespaceResolver,
@@ -211,16 +216,26 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
             IConfigValidatorFactory configValidatorFactory, 
IReplicationStrategyFactory replicationStrategyFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
-        IDiskCachedPageAllocator pageAllocator = 
DefaultDiskCachedPageAllocator.INSTANCE;
-        IBufferCacheReadContext defaultContext = 
DefaultBufferCacheReadContextProvider.DEFAULT;
+        CloudConfigurator cloudConfigurator;
+        IDiskResourceCacheLockNotifier lockNotifier;
+        IDiskCachedPageAllocator pageAllocator;
+        IBufferCacheReadContext defaultContext;
         if (isCloudDeployment()) {
-            persistenceIOManager =
-                    CloudManagerProvider.createIOManager(cloudProperties, 
ioManager, namespacePathResolver);
-            partitionBootstrapper = 
CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
+            cloudConfigurator = CloudConfigurator.of(cloudProperties, 
ioManager, namespacePathResolver);
+            persistenceIOManager = cloudConfigurator.getCloudIoManager();
+            partitionBootstrapper = 
cloudConfigurator.getPartitionBootstrapper();
+            lockNotifier = cloudConfigurator.getLockNotifier();
+            pageAllocator = cloudConfigurator.getPageAllocator();
+            defaultContext = cloudConfigurator.getDefaultContext();
         } else {
+            cloudConfigurator = null;
             persistenceIOManager = ioManager;
             partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
+            lockNotifier = NoOpDiskResourceCacheLockNotifier.INSTANCE;
+            pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
+            defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
         }
+
         int ioQueueLen = 
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
                 
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -261,9 +276,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         // Must start vbc now instead of by life cycle component manager 
(lccm) because lccm happens after
         // the metadata bootstrap task
         ((ILifeCycleComponent) virtualBufferCache).start();
-        datasetLifecycleManager =
-                new DatasetLifecycleManager(storageProperties, 
localResourceRepository, txnSubsystem.getLogManager(),
-                        virtualBufferCache, indexCheckpointManagerProvider, 
ioManager.getIODevices().size());
+        datasetLifecycleManager = new 
DatasetLifecycleManager(storageProperties, localResourceRepository,
+                txnSubsystem.getLogManager(), virtualBufferCache, 
indexCheckpointManagerProvider, lockNotifier);
         
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
         final String nodeId = getServiceContext().getNodeId();
         final Set<Integer> nodePartitions = 
metadataProperties.getNodePartitions(nodeId);
@@ -300,6 +314,15 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                     fileInfoMap, defaultContext);
         }
 
+        if (cloudConfigurator != null) {
+            diskCacheService =
+                    
cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(), 
bufferCache, fileInfoMap);
+        } else {
+            diskCacheService = NoOpDiskCacheMonitoringService.INSTANCE;
+        }
+
+        diskCacheService.start();
+
         NodeControllerService ncs = (NodeControllerService) 
getServiceContext().getControllerService();
         FileReference appDir =
                 
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
@@ -351,6 +374,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     @Override
     public synchronized void preStop() throws Exception {
         activeManager.shutdown();
+        diskCacheService.stop();
         if (metadataNodeStub != null) {
             unexportMetadataNodeStub();
         }
@@ -694,6 +718,11 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         return diskWriteRateLimiterProvider;
     }
 
+    @Override
+    public IDiskCacheMonitoringService getDiskCacheService() {
+        return diskCacheService;
+    }
+
     @Override
     public boolean isCloudDeployment() {
         return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index 6b3257d870..547bc8bea3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -63,6 +63,8 @@ public class CloudToLocalStorageCachingTask implements 
INCLifecycleTask {
         IPartitionBootstrapper bootstrapper = 
applicationContext.getPartitionBootstrapper();
         bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), 
metadataNode, metadataPartitionId, cleanup,
                 latestCheckpoint == null);
+        // Report all local resources
+        
applicationContext.getDiskCacheService().reportLocalResources(lrs.loadAndGetAllResources());
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index ba3ffcd5de..cb8b8e5bb7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -61,7 +61,7 @@ import org.apache.asterix.app.config.ConfigValidator;
 import org.apache.asterix.app.io.PersistedResourceRegistry;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
 import org.apache.asterix.app.result.JobResultCallback;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.INamespacePathResolver;
@@ -186,8 +186,7 @@ public class CCApplication extends BaseCCApplication {
         CloudProperties cloudProperties = null;
         if (cloudDeployment) {
             cloudProperties = new 
CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
-            ioManager =
-                    (IOManager) 
CloudManagerProvider.createIOManager(cloudProperties, ioManager, 
namespacePathResolver);
+            ioManager = CloudConfigurator.createIOManager(ioManager, 
cloudProperties, namespacePathResolver);
         }
         IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, 
lifecycleCoordinator, Receptionist::new,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 6888accc37..4c06994354 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -11,13 +11,19 @@
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
     "cloud.profiler.log.interval" : 0,
+    "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
+    "cloud.storage.debug.mode.enabled" : false,
+    "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disk.monitor.interval" : 60,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.index.inactive.duration.threshold" : 6,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
+    "cloud.storage.sweep.threshold.percentage" : 0.9,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 31daf991a3..1d64adeb01 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -11,13 +11,19 @@
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
     "cloud.profiler.log.interval" : 0,
+    "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
+    "cloud.storage.debug.mode.enabled" : false,
+    "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disk.monitor.interval" : 60,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.index.inactive.duration.threshold" : 6,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
+    "cloud.storage.sweep.threshold.percentage" : 0.9,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index a47b3be23b..aff2c778fe 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -11,13 +11,19 @@
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
     "cloud.profiler.log.interval" : 0,
+    "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
+    "cloud.storage.debug.mode.enabled" : false,
+    "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disk.monitor.interval" : 60,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.index.inactive.duration.threshold" : 6,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
+    "cloud.storage.sweep.threshold.percentage" : 0.9,
     "compiler\.arrayindex" : true,
     "compiler.batch.lookup" : true,
     "compiler.cbo" : true,
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
new file mode 100644
index 0000000000..a0c1fbbd26
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.asterix.common.api.INamespacePathResolver;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
+import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
+import 
org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
+import 
org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
+import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import 
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import 
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class CloudConfigurator {
+    private final CloudProperties cloudProperties;
+    private final IOManager localIoManager;
+    private final AbstractCloudIOManager cloudIOManager;
+    private final IPhysicalDrive physicalDrive;
+    private final IDiskResourceCacheLockNotifier lockNotifier;
+    private final IDiskCachedPageAllocator pageAllocator;
+    private final IBufferCacheReadContext defaultContext;
+    private final boolean diskCacheManagerRequired;
+    private final long diskCacheMonitoringInterval;
+
+    private CloudConfigurator(CloudProperties cloudProperties, IIOManager 
ioManager,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+        this.cloudProperties = cloudProperties;
+        localIoManager = (IOManager) ioManager;
+        diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == 
CloudCachePolicy.SELECTIVE;
+        cloudIOManager = createIOManager(ioManager, cloudProperties, 
nsPathResolver);
+        physicalDrive = createPhysicalDrive(diskCacheManagerRequired, 
cloudProperties, ioManager);
+        lockNotifier = createLockNotifier(diskCacheManagerRequired);
+        pageAllocator = createPageAllocator(diskCacheManagerRequired);
+        defaultContext = 
createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
+        diskCacheMonitoringInterval = 
cloudProperties.getStorageDiskMonitorInterval();
+    }
+
+    public IPartitionBootstrapper getPartitionBootstrapper() {
+        return cloudIOManager;
+    }
+
+    public IIOManager getCloudIoManager() {
+        return cloudIOManager;
+    }
+
+    public IDiskResourceCacheLockNotifier getLockNotifier() {
+        return lockNotifier;
+    }
+
+    public IDiskCachedPageAllocator getPageAllocator() {
+        return pageAllocator;
+    }
+
+    public IBufferCacheReadContext getDefaultContext() {
+        return defaultContext;
+    }
+
+    public IDiskCacheMonitoringService 
createDiskCacheMonitoringService(INCServiceContext serviceContext,
+            IBufferCache bufferCache, Map<Integer, BufferedFileHandle> 
fileInfoMap) {
+        if (!diskCacheManagerRequired) {
+            return NoOpDiskCacheMonitoringService.INSTANCE;
+        }
+
+        CloudDiskResourceCacheLockNotifier resourceCacheManager = 
(CloudDiskResourceCacheLockNotifier) lockNotifier;
+        BufferCache diskBufferCache = (BufferCache) bufferCache;
+        int numOfIoDevices = localIoManager.getIODevices().size();
+        IApplicationConfig appConfig = serviceContext.getAppConfig();
+        int ioParallelism = 
appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
+        int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
+        int numOfSweepThreads = ioParallelism * numOfIoDevices;
+        // Ensure at least each sweep thread has one entry in the queue
+        int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
+        long inactiveThreshold = 
cloudProperties.getStorageIndexInactiveDurationThreshold();
+        // +1 for the monitorThread
+        ExecutorService executor = 
Executors.newFixedThreadPool(numOfSweepThreads + 1);
+        DiskCacheSweeperThread monitorThread = new 
DiskCacheSweeperThread(executor, diskCacheMonitoringInterval,
+                resourceCacheManager, cloudIOManager, numOfSweepThreads, 
maxSweepQueueSize, physicalDrive,
+                diskBufferCache, fileInfoMap, inactiveThreshold);
+
+        IDiskCacheMonitoringService diskCacheService =
+                new CloudDiskCacheMonitoringAndPrefetchingService(executor, 
physicalDrive, monitorThread);
+        localIoManager.setSpaceMaker(monitorThread);
+        return diskCacheService;
+    }
+
+    public static CloudConfigurator of(CloudProperties cloudProperties, 
IIOManager ioManager,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+        return new CloudConfigurator(cloudProperties, ioManager, 
nsPathResolver);
+    }
+
+    public static AbstractCloudIOManager createIOManager(IIOManager ioManager, 
CloudProperties cloudProperties,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+        IOManager localIoManager = (IOManager) ioManager;
+        CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
+        if (policy == CloudCachePolicy.EAGER) {
+            return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver);
+        }
+
+        boolean selective = policy == CloudCachePolicy.SELECTIVE;
+        return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, selective);
+    }
+
+    private static IPhysicalDrive createPhysicalDrive(boolean 
diskCacheManagerRequired, CloudProperties cloudProperties,
+            IIOManager ioManager) throws HyracksDataException {
+        if (diskCacheManagerRequired) {
+            double storagePercentage = 
cloudProperties.getStorageAllocationPercentage();
+            double pressureThreshold = 
cloudProperties.getStorageSweepThresholdPercentage();
+            long pressureDebugSize = 
cloudProperties.getStorageDebugSweepThresholdSize();
+            return new PhysicalDrive(ioManager.getIODevices(), 
pressureThreshold, storagePercentage, pressureDebugSize);
+        }
+
+        return DummyPhysicalDrive.INSTANCE;
+    }
+
+    private static IDiskResourceCacheLockNotifier createLockNotifier(boolean 
diskCacheManagerRequired) {
+        if (diskCacheManagerRequired) {
+            return new 
CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+        }
+
+        return NoOpDiskResourceCacheLockNotifier.INSTANCE;
+    }
+
+    private static IDiskCachedPageAllocator createPageAllocator(boolean 
diskCacheManagerRequired) {
+        if (diskCacheManagerRequired) {
+            return CloudDiskCachedPageAllocator.INSTANCE;
+        }
+        return DefaultDiskCachedPageAllocator.INSTANCE;
+    }
+
+    private static IBufferCacheReadContext 
createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
+            IPhysicalDrive drive) {
+        if (diskCacheManagerRequired) {
+            return new DefaultCloudReadContext(drive);
+        }
+
+        return DefaultBufferCacheReadContextProvider.DEFAULT;
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 04979e6810..c993cd31e6 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -102,7 +102,7 @@ final class EagerCloudIOManager extends 
AbstractCloudIOManager {
     }
 
     @Override
-    public void evict(FileReference directory) {
+    public void evict(String resourcePath) {
         throw new UnsupportedOperationException("evict is not supported with 
Eager caching");
     }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 612237a96b..afe08780ac 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -68,11 +68,11 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
     private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean 
replaceableAccessor) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, boolean selective) throws 
HyracksDataException {
         super(ioManager, cloudProperties, nsPathResolver);
         accessor = new InitialCloudAccessor(cloudClient, bucket, 
localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, 
writeBufferProvider);
-        if (replaceableAccessor) {
+        if (selective) {
             replacer = InitialCloudAccessor.NO_OP_REPLACER;
         } else {
             replacer = () -> {
@@ -208,8 +208,8 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
     }
 
     @Override
-    public void evict(FileReference directory) throws HyracksDataException {
-        accessor.doEvict(directory);
+    public void evict(String resourcePath) throws HyracksDataException {
+        accessor.doEvict(resolve(resourcePath));
     }
 
     private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws 
HyracksDataException {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 7539aa731d..a3079e6da5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -180,8 +180,8 @@ public final class ParallelCacher implements 
IParallelCacher {
     @Override
     public synchronized void add(Collection<FileReference> files) {
         LOGGER.info("Uncache {}", files);
+        // We only can 'uncache' data files
         uncachedDataFiles.putAll(getFiles(files, DATA_FILTER));
-        uncachedMetadataFiles.putAll(getFiles(files, METADATA_FILTER));
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 934468aab8..218015d5d4 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.lazy.accessor;
 
+import static org.apache.asterix.cloud.util.CloudFileUtil.DATA_FILTER;
+
 import java.util.Collection;
 import java.util.Set;
 
@@ -50,10 +52,21 @@ public class SelectiveCloudAccessor extends 
ReplaceableCloudAccessor {
             throw new IllegalStateException(directory + " is not a directory");
         }
 
-        // TODO only delete data files?
-        Collection<FileReference> uncachedFiles = 
UncachedFileReference.toUncached(localIoManager.list(directory));
+        // Get a list of all data files
+        Collection<FileReference> files = localIoManager.list(directory, 
DATA_FILTER);
+        if (files.isEmpty()) {
+            // Nothing to evict
+            return;
+        }
+
+        // Convert file references to uncached ones
+        Collection<FileReference> uncachedFiles = 
UncachedFileReference.toUncached(files);
+        // Add all data files to the cacher to indicate they are in a 
'cacheable' state (i.e., not downloaded)
         cacher.add(uncachedFiles);
-        localIoManager.delete(directory);
+        // Delete all data files from the local drive
+        for (FileReference uncachedFile : uncachedFiles) {
+            localIoManager.delete(uncachedFile);
+        }
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index 4ea6c464e3..406f2f30b5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -18,17 +18,21 @@
  */
 package org.apache.asterix.cloud.lazy.filesystem;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
 
 public final class HolePuncherProvider {
     private static final IHolePuncher UNSUPPORTED = 
HolePuncherProvider::unsupported;
+    private static final IHolePuncher LINUX = 
HolePuncherProvider::linuxPunchHole;
 
     private HolePuncherProvider() {
     }
@@ -39,13 +43,35 @@ public final class HolePuncherProvider {
             return UNSUPPORTED;
         }
 
-        return new DebugHolePuncher(cloudIOManager, bufferProvider);
+        if (FileSystemOperationDispatcherUtil.isLinux()) {
+            return LINUX;
+        } else if (cloudProperties.isStorageDebugModeEnabled()) {
+            // Running on debug mode on a non-Linux box
+            return new DebugHolePuncher(cloudIOManager, bufferProvider);
+        }
+
+        throw new UnsupportedOperationException(
+                "Hole puncher is not supported using " + 
FileSystemOperationDispatcherUtil.getOSName());
     }
 
     private static int unsupported(IFileHandle fileHandle, long offset, long 
length) {
         throw new UnsupportedOperationException("punchHole is not supported");
     }
 
+    private static int linuxPunchHole(IFileHandle fileHandle, long offset, 
long length) throws HyracksDataException {
+        CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
+        int fileDescriptor = cloudFileHandle.getFileDescriptor();
+        int blockSize = cloudFileHandle.getBlockSize();
+        int freedSpace = 
FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, 
blockSize);
+        try {
+            cloudFileHandle.getFileChannel().force(false);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        return freedSpace;
+    }
+
     private static final class DebugHolePuncher implements IHolePuncher {
         private final AbstractCloudIOManager cloudIOManager;
         private final IWriteBufferProvider bufferProvider;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8e6becf672..888cea1186 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 import org.apache.hyracks.util.cache.ICacheManager;
 
@@ -152,4 +153,9 @@ public interface INcApplicationContext extends 
IApplicationContext {
      * @return the disk write rate limiter provider
      */
     IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+    /**
+     * @return disk cache service
+     */
+    IDiskCacheMonitoringService getDiskCacheService();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b7649a86e..5c23f5cce4 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,12 @@
 package org.apache.asterix.common.config;
 
 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;
 
 import java.util.concurrent.TimeUnit;
 
@@ -28,6 +32,7 @@ import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.StorageUtil;
 
 public class CloudProperties extends AbstractProperties {
 
@@ -43,6 +48,14 @@ public class CloudProperties extends AbstractProperties {
         CLOUD_STORAGE_ENDPOINT(STRING, ""),
         CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
         CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+        // 80% of the total disk space
+        CLOUD_STORAGE_ALLOCATION_PERCENTAGE(DOUBLE, 0.8d),
+        // 90% of the allocated space for storage (i.e., 90% of the 80% of the 
total disk space)
+        CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE(DOUBLE, 0.9d),
+        CLOUD_STORAGE_DISK_MONITOR_INTERVAL(POSITIVE_INTEGER, 60),
+        CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 6),
+        CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
+        CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
         CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
 
         private final IOptionType interpreter;
@@ -63,6 +76,12 @@ public class CloudProperties extends AbstractProperties {
                 case CLOUD_STORAGE_ENDPOINT:
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                 case CLOUD_STORAGE_CACHE_POLICY:
+                case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+                case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+                case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+                case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+                case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+                case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
                 case CLOUD_PROFILER_LOG_INTERVAL:
                     return Section.COMMON;
                 default:
@@ -86,9 +105,36 @@ public class CloudProperties extends AbstractProperties {
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                     return "Indicates whether or not anonymous auth should be 
used for the cloud storage";
                 case CLOUD_STORAGE_CACHE_POLICY:
-                    return "The caching policy (either eager or lazy). 'Eager' 
caching will download all partitions"
-                            + " upon booting, whereas lazy caching will 
download a file upon request to open it."
+                    return "The caching policy (either eager, lazy or 
selective). 'eager' caching will download"
+                            + "all partitions upon booting, whereas 'lazy' 
caching will download a file upon"
+                            + " request to open it. 'selective' caching will 
act as the 'lazy' policy; however, "
+                            + " it allows to use the local disk(s) as a cache, 
where pages and indexes can be "
+                            + " cached or evicted according to the pressure 
imposed on the local disks."
                             + " (default: 'lazy')";
+                case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+                    return "The percentage of the total disk space that should 
be allocated for data storage when the"
+                            + " 'selective' caching policy is used. The 
remaining will act as a buffer for "
+                            + " query workspace (i.e., for query operations 
that require spilling to disk)."
+                            + " (default: 80% of the total disk space)";
+                case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+                    return "The percentage of the used storage space at which 
the disk sweeper starts freeing space by"
+                            + " punching holes in stored indexes or by 
evicting them entirely, "
+                            + " when the 'selective' caching policy is used."
+                            + " (default: 90% of the allocated space for 
storage)";
+                case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+                    return "The disk monitoring interval time (in seconds): 
determines how often the system"
+                            + " checks for pressure on disk space when using 
the 'selective' caching policy."
+                            + " (default : 60 seconds)";
+                case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+                    return "The duration in minutes to consider an index is 
inactive. (default: 360 or 6 hours)";
+                case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
+                    return "Whether or not the debug mode is enabled when 
using the 'selective' caching policy."
+                            + "(default: false)";
+                case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+                    return "For debugging only. Pressure size will be the 
current used space + the additional bytes"
+                            + " provided by this configuration option instead 
of using "
+                            + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
+                            + " (default: 0. I.e., 
CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
                 case CLOUD_PROFILER_LOG_INTERVAL:
                     return "The waiting time (in minutes) to log cloud request 
statistics (default: 0, which means"
                             + " the profiler is disabled by default). The 
minimum is 1 minute."
@@ -138,6 +184,31 @@ public class CloudProperties extends AbstractProperties {
         return 
CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
     }
 
+    public double getStorageAllocationPercentage() {
+        return accessor.getDouble(Option.CLOUD_STORAGE_ALLOCATION_PERCENTAGE);
+    }
+
+    public double getStorageSweepThresholdPercentage() {
+        return 
accessor.getDouble(Option.CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE);
+    }
+
+    public int getStorageDiskMonitorInterval() {
+        return accessor.getInt(Option.CLOUD_STORAGE_DISK_MONITOR_INTERVAL);
+    }
+
+    public long getStorageIndexInactiveDurationThreshold() {
+        int minutes = 
accessor.getInt(Option.CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD);
+        return TimeUnit.MINUTES.toNanos(minutes);
+    }
+
+    public boolean isStorageDebugModeEnabled() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_DEBUG_MODE_ENABLED);
+    }
+
+    public long getStorageDebugSweepThresholdSize() {
+        return isStorageDebugModeEnabled() ? 
accessor.getLong(Option.CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE) : 0L;
+    }
+
     public long getProfilerLogInterval() {
         long interval = 
TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
         return interval == 0 ? 0 : Math.max(interval, 
TimeUnit.MINUTES.toNanos(1));
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a4515570a..07801a90a8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -59,6 +59,7 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -71,6 +72,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     private final IVirtualBufferCache vbc;
     private final ILogManager logManager;
     private final LogRecord waitLog;
+    private final IDiskResourceCacheLockNotifier lockNotifier;
     private volatile boolean stopped = false;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     // all LSM-trees share the same virtual buffer cache list
@@ -78,7 +80,8 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
 
     public DatasetLifecycleManager(StorageProperties storageProperties, 
ILocalResourceRepository resourceRepository,
             ILogManager logManager, IVirtualBufferCache vbc,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, 
int numPartitions) {
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+            IDiskResourceCacheLockNotifier lockNotifier) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -89,6 +92,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             vbcs.add(vbc);
         }
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.lockNotifier = lockNotifier;
         waitLog = new LogRecord();
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
@@ -122,6 +126,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             datasetResource = getDatasetLifecycle(did);
         }
         datasetResource.register(resource, (ILSMIndex) index);
+        lockNotifier.onRegister(resource, index, 
datasetResource.getIndexInfo(resource.getId()).getPartition());
     }
 
     private int getDIDfromResourcePath(String resourcePath) throws 
HyracksDataException {
@@ -145,7 +150,6 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-
         DatasetResource dsr = datasets.get(did);
         IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
@@ -153,6 +157,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, 
resourcePath);
         }
 
+        lockNotifier.onUnregister(resourceID);
         PrimaryIndexOperationTracker opTracker = 
dsr.getOpTracker(iInfo.getPartition());
         if (iInfo.getReferenceCount() != 0 || (opTracker != null && 
opTracker.getNumActiveOperations() != 0)) {
             if (LOGGER.isErrorEnabled()) {
@@ -190,6 +195,9 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
+        // Notify first before opening a resource
+        lockNotifier.onOpen(resourceID);
+
         DatasetResource dsr = datasets.get(did);
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo == null || !dsInfo.isRegistered()) {
@@ -253,6 +261,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
             if (iInfo == null) {
                 throw 
HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, 
resourceID);
             }
+            lockNotifier.onClose(resourceID);
         } finally {
             // Regardless of what exception is thrown in the try-block (e.g., 
line 279),
             // we have to un-touch the index and dataset.
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
index 04f594ee39..620417be19 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -30,6 +30,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProv
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -45,32 +46,32 @@ public class RuntimeComponentsProvider implements 
IStorageManager, ILSMIOOperati
 
     @Override
     public ILSMIOOperationScheduler getIoScheduler(INCServiceContext ctx) {
-        return ((INcApplicationContext) 
ctx.getApplicationContext()).getLSMIOScheduler();
+        return getAppCtx(ctx).getLSMIOScheduler();
     }
 
     @Override
     public IIOManager getIoManager(INCServiceContext ctx) {
-        return ((INcApplicationContext) 
ctx.getApplicationContext()).getPersistenceIoManager();
+        return getAppCtx(ctx).getPersistenceIoManager();
     }
 
     @Override
     public IBufferCache getBufferCache(INCServiceContext ctx) {
-        return ((INcApplicationContext) 
ctx.getApplicationContext()).getBufferCache();
+        return getAppCtx(ctx).getBufferCache();
     }
 
     @Override
     public ILocalResourceRepository 
getLocalResourceRepository(INCServiceContext ctx) {
-        return ((INcApplicationContext) 
ctx.getApplicationContext()).getLocalResourceRepository();
+        return getAppCtx(ctx).getLocalResourceRepository();
     }
 
     @Override
     public IDatasetLifecycleManager getLifecycleManager(INCServiceContext ctx) 
{
-        return ((INcApplicationContext) 
ctx.getApplicationContext()).getDatasetLifecycleManager();
+        return getAppCtx(ctx).getDatasetLifecycleManager();
     }
 
     @Override
     public IResourceIdFactory getResourceIdFactory(INCServiceContext ctx) {
-        return ((INcApplicationContext) 
ctx.getApplicationContext()).getResourceIdFactory();
+        return getAppCtx(ctx).getResourceIdFactory();
     }
 
     @Override
@@ -78,6 +79,16 @@ public class RuntimeComponentsProvider implements 
IStorageManager, ILSMIOOperati
         return registry.getClassIdentifier(getClass(), serialVersionUID);
     }
 
+    @Override
+    public IDiskCacheMonitoringService 
getDiskCacheMonitoringService(INCServiceContext ctx) {
+        return getAppCtx(ctx).getDiskCacheService();
+
+    }
+
+    private INcApplicationContext getAppCtx(INCServiceContext ctx) {
+        return ((INcApplicationContext) ctx.getApplicationContext());
+    }
+
     @SuppressWarnings("squid:S1172") // unused parameter
     public static IJsonSerializable fromJson(IPersistedResourceRegistry 
registry, JsonNode json) {
         return RUNTIME_PROVIDER;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
index 381d5ebe30..ba513facd9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.cloud.cache.service;
 
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.disk.prefetch.AbstractPrefetchRequest;
@@ -54,6 +56,11 @@ public final class 
CloudDiskCacheMonitoringAndPrefetchingService
         return true;
     }
 
+    @Override
+    public void reportLocalResources(Map<Long, LocalResource> localResources) {
+        monitorThread.reportLocalResources(localResources);
+    }
+
     @Override
     public IPhysicalDrive getPhysicalDrive() {
         return drive;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 55bab43087..036a812bb1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -18,67 +18,151 @@
  */
 package org.apache.hyracks.cloud.cache.service;
 
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hyracks.cloud.cache.unit.AbstractIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
-// TODO locking should be revised
 public final class CloudDiskResourceCacheLockNotifier implements 
IDiskResourceCacheLockNotifier {
-    private final Int2ObjectMap<DatasetUnit> datasets;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final int metadataPartition;
+    private final Long2ObjectMap<LocalResource> inactiveResources;
+    private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
+    private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
+    private final ReentrantReadWriteLock evictionLock;
 
-    public CloudDiskResourceCacheLockNotifier() {
-        datasets = Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+    public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
+        this.metadataPartition = metadataPartition;
+        inactiveResources = Long2ObjectMaps.synchronize(new 
Long2ObjectOpenHashMap<>());
+        unsweepableIndexes = Long2ObjectMaps.synchronize(new 
Long2ObjectOpenHashMap<>());
+        sweepableIndexes = Long2ObjectMaps.synchronize(new 
Long2ObjectOpenHashMap<>());
+        evictionLock = new ReentrantReadWriteLock();
     }
 
     @Override
-    public void onRegister(int datasetId, LocalResource localResource, IIndex 
index) {
+    public void onRegister(LocalResource localResource, IIndex index, int 
partition) {
         ILSMIndex lsmIndex = (ILSMIndex) index;
-        if (lsmIndex.getDiskCacheManager().isSweepable()) {
-            DatasetUnit datasetUnit = datasets.computeIfAbsent(datasetId, 
DatasetUnit::new);
-            datasetUnit.addIndex(localResource.getId(), lsmIndex);
+        evictionLock.readLock().lock();
+        try {
+            if (partition != metadataPartition) {
+                long resourceId = localResource.getId();
+                if (lsmIndex.getDiskCacheManager().isSweepable()) {
+                    sweepableIndexes.put(resourceId, new 
SweepableIndexUnit(localResource, lsmIndex));
+                } else {
+                    unsweepableIndexes.put(resourceId, new 
UnsweepableIndexUnit(localResource));
+                }
+            }
+            inactiveResources.remove(localResource.getId());
+        } finally {
+            evictionLock.readLock().unlock();
         }
     }
 
     @Override
-    public void onUnregister(int datasetId, long resourceId) {
-        DatasetUnit datasetUnit = datasets.get(datasetId);
-        if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
-            datasets.remove(datasetId);
+    public void onUnregister(long resourceId) {
+        evictionLock.readLock().lock();
+        try {
+            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            if (indexUnit != null) {
+                indexUnit.drop();
+            } else {
+                inactiveResources.remove(resourceId);
+            }
+        } finally {
+            evictionLock.readLock().unlock();
+        }
+    }
 
-            // TODO invalidate eviction plans if the disk is not pressured
+    private AbstractIndexUnit getUnit(long resourceId) {
+        AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+        if (indexUnit == null) {
+            indexUnit = unsweepableIndexes.get(resourceId);
         }
+        return indexUnit;
     }
 
     @Override
-    public void onOpen(int datasetId, long resourceId) {
-        DatasetUnit datasetUnit = datasets.get(datasetId);
-        if (datasetUnit != null) {
-            IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
-            if (indexUnit != null) {
-                indexUnit.readLock();
+    public void onOpen(long resourceId) {
+        evictionLock.readLock().lock();
+        try {
+            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            if (indexUnit == null) {
+                // Metadata resource
+                return;
             }
+            indexUnit.open();
+        } finally {
+            evictionLock.readLock().unlock();
         }
     }
 
     @Override
-    public void onClose(int datasetId, long resourceId) {
-        DatasetUnit datasetUnit = datasets.get(datasetId);
-        if (datasetUnit != null) {
-            IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
-            if (indexUnit != null) {
-                indexUnit.readUnlock();
+    public void onClose(long resourceId) {
+        evictionLock.readLock().lock();
+        try {
+            AbstractIndexUnit indexUnit = getUnit(resourceId);
+            if (indexUnit == null) {
+                // Metadata resource
+                return;
+            }
+            indexUnit.close();
+        } finally {
+            evictionLock.readLock().unlock();
+        }
+
+    }
+
+    ReentrantReadWriteLock getEvictionLock() {
+        return evictionLock;
+    }
+
+    void reportLocalResources(Map<Long, LocalResource> localResources) {
+        inactiveResources.clear();
+        // First check whatever we had already
+        for (LocalResource lr : localResources.values()) {
+            if (unsweepableIndexes.containsKey(lr.getId()) || 
sweepableIndexes.containsKey(lr.getId())) {
+                // We already have this resource
+                continue;
             }
+
+            // Probably a new resource or an old resource that wasn't 
registered before
+            inactiveResources.put(lr.getId(), lr);
         }
+
+        removeUnassignedResources(unsweepableIndexes, localResources);
+        removeUnassignedResources(sweepableIndexes, localResources);
+
+        LOGGER.info("Retained active {unsweepable: {}, sweepable: {}} and 
inactive: {}", unsweepableIndexes,
+                sweepableIndexes, inactiveResources.values().stream()
+                        .map(x -> "(id: " + x.getId() + ",  path: " + 
x.getPath() + ")").toList());
+    }
+
+    private void removeUnassignedResources(Long2ObjectMap<?> indexes, 
Map<Long, LocalResource> localResources) {
+        indexes.long2ObjectEntrySet().removeIf(x -> 
!localResources.containsKey(x.getLongKey()));
+    }
+
+    Collection<LocalResource> getInactiveResources() {
+        return inactiveResources.values();
+    }
+
+    Collection<UnsweepableIndexUnit> getUnsweepableIndexes() {
+        return unsweepableIndexes.values();
     }
 
-    Int2ObjectMap<DatasetUnit> getDatasets() {
-        return datasets;
+    void getSweepableIndexes(Collection<SweepableIndexUnit> indexes) {
+        indexes.addAll(sweepableIndexes.values());
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index 79b978e33d..f594e06784 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.cloud.cache.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -27,11 +28,12 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IDiskSpaceMaker;
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.cloud.sweeper.ISweeper;
 import org.apache.hyracks.cloud.sweeper.Sweeper;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -44,21 +46,29 @@ public class DiskCacheSweeperThread implements Runnable, 
IDiskSpaceMaker {
     private final long waitTime;
     private final CloudDiskResourceCacheLockNotifier resourceManager;
     private final IPhysicalDrive physicalDrive;
-    private final List<IndexUnit> indexes;
+    private final List<SweepableIndexUnit> indexes;
+    private final ICloudIOManager cloudIOManager;
     private final ISweeper sweeper;
+    private final long inactiveTimeThreshold;
 
     public DiskCacheSweeperThread(ExecutorService executorService, long 
waitTime,
             CloudDiskResourceCacheLockNotifier resourceManager, 
ICloudIOManager cloudIOManager, int numOfSweepThreads,
             int sweepQueueSize, IPhysicalDrive physicalDrive, BufferCache 
bufferCache,
-            Map<Integer, BufferedFileHandle> fileInfoMap) {
+            Map<Integer, BufferedFileHandle> fileInfoMap, long 
inactiveTimeThreshold) {
         this.waitTime = TimeUnit.SECONDS.toMillis(waitTime);
         this.resourceManager = resourceManager;
         this.physicalDrive = physicalDrive;
+        this.inactiveTimeThreshold = inactiveTimeThreshold;
         indexes = new ArrayList<>();
+        this.cloudIOManager = cloudIOManager;
         sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, 
fileInfoMap, numOfSweepThreads,
                 sweepQueueSize);
     }
 
+    public void reportLocalResources(Map<Long, LocalResource> localResources) {
+        resourceManager.reportLocalResources(localResources);
+    }
+
     @Override
     public void makeSpaceOrThrow(IOException ioException) throws 
HyracksDataException {
         if (ioException.getMessage().contains("no space")) {
@@ -83,7 +93,7 @@ public class DiskCacheSweeperThread implements Runnable, 
IDiskSpaceMaker {
         while (true) {
             synchronized (this) {
                 try {
-                    sweep();
+                    makeSpace();
                     wait(waitTime);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -94,20 +104,66 @@ public class DiskCacheSweeperThread implements Runnable, 
IDiskSpaceMaker {
         }
     }
 
-    private void sweep() {
+    private void makeSpace() {
         if (physicalDrive.computeAndCheckIsPressured()) {
-            for (DatasetUnit dataset : resourceManager.getDatasets().values()) 
{
-                indexes.clear();
-                dataset.getIndexes(indexes);
-                sweepIndexes(sweeper, indexes);
+            boolean shouldSweep;
+            resourceManager.getEvictionLock().writeLock().lock();
+            try {
+                shouldSweep = evictInactive();
+            } finally {
+                resourceManager.getEvictionLock().writeLock().unlock();
+            }
+
+            if (shouldSweep) {
+                // index eviction didn't help. Sweep!
+                sweep();
+            }
+        }
+    }
+
+    private boolean evictInactive() {
+        long now = System.nanoTime();
+        Collection<LocalResource> inactiveResources = 
resourceManager.getInactiveResources();
+        Collection<UnsweepableIndexUnit> unsweepableIndexes = 
resourceManager.getUnsweepableIndexes();
+        if (inactiveResources.isEmpty() && unsweepableIndexes.isEmpty()) {
+            // return true to run sweep as nothing will be evicted
+            return true;
+        }
+
+        // First evict all resources that were never been registered
+        for (LocalResource resource : inactiveResources) {
+            try {
+                cloudIOManager.evict(resource.getPath());
+            } catch (HyracksDataException e) {
+                LOGGER.error("Failed to evict resource " + resource.getPath(), 
e);
+            }
+        }
+
+        // Next evict all inactive indexes
+        for (UnsweepableIndexUnit index : unsweepableIndexes) {
+            if (now - index.getLastAccessTime() >= inactiveTimeThreshold) {
+                try {
+                    cloudIOManager.evict(index.getPath());
+                } catch (HyracksDataException e) {
+                    LOGGER.error("Failed to evict resource " + 
index.getPath(), e);
+                }
             }
         }
+
+        // If disk is still pressured, proceed with sweep
+        return physicalDrive.computeAndCheckIsPressured();
+    }
+
+    private void sweep() {
+        indexes.clear();
+        resourceManager.getSweepableIndexes(indexes);
+        sweepIndexes(sweeper, indexes);
     }
 
     @CriticalPath
-    private static void sweepIndexes(ISweeper sweeper, List<IndexUnit> 
indexes) {
+    private static void sweepIndexes(ISweeper sweeper, 
List<SweepableIndexUnit> indexes) {
         for (int i = 0; i < indexes.size(); i++) {
-            IndexUnit index = indexes.get(i);
+            SweepableIndexUnit index = indexes.get(i);
             if (!index.isSweeping()) {
                 try {
                     sweeper.sweep(index);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
new file mode 100644
index 0000000000..153234009e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
+public abstract class AbstractIndexUnit {
+    protected final LocalResource localResource;
+    private final AtomicLong lastAccessTime;
+    private final AtomicInteger openCounter;
+
+    AbstractIndexUnit(LocalResource localResource) {
+        this.localResource = localResource;
+        this.lastAccessTime = new AtomicLong(0);
+        this.openCounter = new AtomicInteger(0);
+    }
+
+    public final void open() {
+        lastAccessTime.set(System.nanoTime());
+        openCounter.get();
+    }
+
+    public final void close() {
+        openCounter.decrementAndGet();
+    }
+
+    public final long getLastAccessTime() {
+        return lastAccessTime.get();
+    }
+
+    public abstract void drop();
+
+    @Override
+    public String toString() {
+        return "(id: " + localResource.getId() + ", path: " + 
localResource.getPath() + "sweepable: " + isSweepable()
+                + ")";
+    }
+
+    protected abstract boolean isSweepable();
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
deleted file mode 100644
index 34b37fbff9..0000000000
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.cache.unit;
-
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-
-public final class DatasetUnit {
-    private final int id;
-    private final ReentrantReadWriteLock lock;
-    /**
-     * Maps resourceId to {@link IndexUnit}
-     */
-    private final Long2ObjectMap<IndexUnit> indexes;
-
-    public DatasetUnit(int datasetId) {
-        id = datasetId;
-        lock = new ReentrantReadWriteLock();
-        indexes = new Long2ObjectOpenHashMap<>();
-
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public IndexUnit addIndex(long resourceId, ILSMIndex index) {
-        writeLock();
-        try {
-            IndexUnit indexUnit = new IndexUnit(resourceId, index);
-            indexes.put(resourceId, indexUnit);
-            return indexUnit;
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    public boolean dropIndex(long resourceId) {
-        IndexUnit indexUnit = indexes.remove(resourceId);
-        // Signal that the index is being dropped so a sweeper thread does not 
sweep this index or stops sweeping
-        indexUnit.setDropped();
-        // Wait for the sweep operation (if running) before allowing the index 
to be dropped
-        indexUnit.waitForSweep();
-        return indexUnit.getIndex().isPrimaryIndex();
-    }
-
-    public IndexUnit getIndex(long resourceId) {
-        readLock();
-        try {
-            return indexes.get(resourceId);
-        } finally {
-            readUnlock();
-        }
-    }
-
-    /**
-     * Return the current indexes
-     *
-     * @param indexUnits container used to return the current indexes
-     */
-    public void getIndexes(List<IndexUnit> indexUnits) {
-        readLock();
-        try {
-            indexUnits.addAll(indexes.values());
-        } finally {
-            readUnlock();
-        }
-    }
-
-    private void readLock() {
-        lock.readLock().lock();
-    }
-
-    private void readUnlock() {
-        lock.readLock().unlock();
-    }
-
-    private void writeLock() {
-        lock.writeLock().lock();
-    }
-
-    private void writeUnlock() {
-        lock.writeLock().unlock();
-    }
-}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
similarity index 77%
rename from 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
rename to 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
index 8f8b412604..794cafc6ff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
@@ -19,37 +19,38 @@
 package org.apache.hyracks.cloud.cache.unit;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
 
-// TODO allow evicting an index entirely
-public final class IndexUnit {
-    private final long id;
+public final class SweepableIndexUnit extends AbstractIndexUnit {
     private final ILSMIndex index;
     private final AtomicBoolean dropped;
     private final AtomicBoolean sweeping;
-    private final AtomicInteger readCounter;
 
-    public IndexUnit(long resourceId, ILSMIndex index) {
-        this.id = resourceId;
+    public SweepableIndexUnit(LocalResource localResource, ILSMIndex index) {
+        super(localResource);
         this.index = index;
         dropped = new AtomicBoolean(false);
         sweeping = new AtomicBoolean(false);
-        readCounter = new AtomicInteger(0);
     }
 
-    public long getId() {
-        return id;
+    @Override
+    public void drop() {
+        // Signal that the index is being dropped so a sweeper thread does not 
sweep this index or stops sweeping
+        dropped.set(false);
+        // Wait for the sweep operation (if running) before allowing the index 
to be dropped
+        waitForSweep();
     }
 
-    public ILSMIndex getIndex() {
-        return index;
+    @Override
+    protected boolean isSweepable() {
+        return true;
     }
 
-    public void setDropped() {
-        dropped.set(false);
+    public ILSMIndex getIndex() {
+        return index;
     }
 
     public boolean isDropped() {
@@ -79,13 +80,4 @@ public final class IndexUnit {
             sweeping.notifyAll();
         }
     }
-
-    public void readLock() {
-        readCounter.incrementAndGet();
-    }
-
-    public void readUnlock() {
-        readCounter.decrementAndGet();
-    }
-
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
similarity index 65%
copy from 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
copy to 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
index 0410e3bf13..204fe68c7a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
@@ -16,18 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.cloud.cache.unit;
 
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.storage.common.LocalResource;
 
-public final class NoOpSweeper implements ISweeper {
-    public static final ISweeper INSTANCE = new NoOpSweeper();
-
-    private NoOpSweeper() {
+public final class UnsweepableIndexUnit extends AbstractIndexUnit {
+    public UnsweepableIndexUnit(LocalResource localResource) {
+        super(localResource);
     }
 
     @Override
-    public void sweep(IndexUnit indexUnit) {
+    public void drop() {
         // NoOp
     }
+
+    @Override
+    protected boolean isSweepable() {
+        return false;
+    }
+
+    public String getPath() {
+        return localResource.getPath();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 83b7629b6a..0dca41765b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 
 /**
@@ -86,9 +85,9 @@ public interface ICloudIOManager {
     int punchHole(IFileHandle fHandle, long offset, long length) throws 
HyracksDataException;
 
     /**
-     * Evict a directory from the local disk cache
+     * Evict a resource from the local disk cache
      *
-     * @param directory to evict
+     * @param resourcePath to evict
      */
-    void evict(FileReference directory) throws HyracksDataException;
+    void evict(String resourcePath) throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
index d064fda10f..9067a3fba6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.cloud.sweeper;
 
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 
 /**
@@ -31,5 +31,5 @@ public interface ISweeper {
      *
      * @param indexUnit to sweep
      */
-    void sweep(IndexUnit indexUnit) throws InterruptedException;
+    void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
index 0410e3bf13..ca103abd15 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.cloud.sweeper;
 
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 
 public final class NoOpSweeper implements ISweeper {
     public static final ISweeper INSTANCE = new NoOpSweeper();
@@ -27,7 +27,7 @@ public final class NoOpSweeper implements ISweeper {
     }
 
     @Override
-    public void sweep(IndexUnit indexUnit) {
+    public void sweep(SweepableIndexUnit indexUnit) {
         // NoOp
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index 79fdb5a5b2..c709cc6f78 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -36,7 +36,7 @@ public final class SweepContext implements ISweepContext {
     private final BufferCache bufferCache;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
     private final AtomicBoolean shutdown;
-    private IndexUnit indexUnit;
+    private SweepableIndexUnit indexUnit;
     private BufferedFileHandle handle;
 
     public SweepContext(ICloudIOManager cloudIOManager, BufferCache 
bufferCache,
@@ -74,11 +74,11 @@ public final class SweepContext implements ISweepContext {
         bufferCache.unpin(page, bcOpCtx);
     }
 
-    public void setIndexUnit(IndexUnit indexUnit) {
+    public void setIndexUnit(SweepableIndexUnit indexUnit) {
         this.indexUnit = indexUnit;
     }
 
-    public IndexUnit getIndexUnit() {
+    public SweepableIndexUnit getIndexUnit() {
         return indexUnit;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index e5d368c8fc..245c95793b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.InvokeUtil;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
@@ -56,7 +56,7 @@ public final class Sweeper implements ISweeper {
     }
 
     @Override
-    public void sweep(IndexUnit indexUnit) throws InterruptedException {
+    public void sweep(SweepableIndexUnit indexUnit) throws 
InterruptedException {
         SweepRequest request = freeRequests.take();
         request.reset(indexUnit);
         requests.put(request);
@@ -131,7 +131,7 @@ public final class Sweeper implements ISweeper {
             this.context = context;
         }
 
-        void reset(IndexUnit indexUnit) {
+        void reset(SweepableIndexUnit indexUnit) {
             context.setIndexUnit(indexUnit);
         }
 
@@ -151,7 +151,7 @@ public final class Sweeper implements ISweeper {
                  */
                 return;
             }
-            IndexUnit indexUnit = context.getIndexUnit();
+            SweepableIndexUnit indexUnit = context.getIndexUnit();
             indexUnit.startSweeping();
             try {
                 ILSMIndex index = indexUnit.getIndex();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b9f277abf3..9909e97a5d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -45,6 +45,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOBulkOperation;
@@ -82,6 +83,8 @@ public class IOManager implements IIOManager {
      * Mutables
      */
     private int workspaceIndex;
+    // TODO use space make on write
+    private IDiskSpaceMaker spaceMaker;
 
     public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver 
deviceComputer, int ioParallelism, int queueSize)
             throws HyracksDataException {
@@ -112,6 +115,7 @@ public class IOManager implements IIOManager {
         for (int i = 0; i < numIoThreads; i++) {
             executor.execute(new IoRequestHandler(i, submittedRequests));
         }
+        spaceMaker = NoOpDiskSpaceMaker.INSTANCE;
     }
 
     public int getQueueSize() {
@@ -596,4 +600,8 @@ public class IOManager implements IIOManager {
     public void performBulkOperation(IIOBulkOperation bulkOperation) throws 
HyracksDataException {
         ((AbstractBulkOperation) bulkOperation).performOperation();
     }
+
+    public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
+        this.spaceMaker = spaceMaker;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
similarity index 63%
copy from 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
copy to 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
index 0410e3bf13..1527dde053 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
@@ -16,18 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.cloud.sweeper;
+package org.apache.hyracks.control.nc.io;
 
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import java.io.IOException;
 
-public final class NoOpSweeper implements ISweeper {
-    public static final ISweeper INSTANCE = new NoOpSweeper();
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
 
-    private NoOpSweeper() {
+final class NoOpDiskSpaceMaker implements IDiskSpaceMaker {
+    static final IDiskSpaceMaker INSTANCE = new NoOpDiskSpaceMaker();
+
+    private NoOpDiskSpaceMaker() {
     }
 
     @Override
-    public void sweep(IndexUnit indexUnit) {
-        // NoOp
+    public void makeSpaceOrThrow(IOException e) throws HyracksDataException {
+        throw HyracksDataException.create(e);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
index bf870b1756..426e81db8f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
@@ -26,6 +26,8 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 
 public class BTreeHelperStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@ public class BTreeHelperStorageManager implements 
IStorageManager {
     public IResourceLifecycleManager<IIndex> 
getLifecycleManager(INCServiceContext ctx) {
         return RuntimeContext.get(ctx).getIndexLifecycleManager();
     }
+
+    @Override
+    public IDiskCacheMonitoringService 
getDiskCacheMonitoringService(INCServiceContext ctx) {
+        return NoOpDiskCacheMonitoringService.INSTANCE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index 4932bf0c7f..9fc3b8d064 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
 import org.apache.hyracks.cloud.sweeper.SweepContext;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
@@ -59,7 +59,7 @@ public final class ColumnSweeper {
 
     public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector 
sweepProjector)
             throws HyracksDataException {
-        IndexUnit indexUnit = context.getIndexUnit();
+        SweepableIndexUnit indexUnit = context.getIndexUnit();
         LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
         IColumnProjectionInfo projectionInfo = 
captureSweepableComponents(lsmColumnBTree, sweepProjector);
         if (projectionInfo == null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index a0b592a5e1..7ea4e353a8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -45,6 +45,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import 
org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import org.apache.hyracks.storage.common.IStorageManager;
 import 
org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -87,13 +88,14 @@ public class LSMColumnBTreeLocalResource extends 
LSMBTreeLocalResource {
         List<IVirtualBufferCache> vbcs = 
vbcProvider.getVirtualBufferCaches(serviceCtx, file);
         ioOpCallbackFactory.initialize(serviceCtx, this);
         pageWriteCallbackFactory.initialize(serviceCtx, this);
+        IDiskCacheMonitoringService diskCacheService = 
storageManager.getDiskCacheMonitoringService(serviceCtx);
         return LSMColumnBTreeUtil.createLSMTree(ioManager, vbcs, file, 
storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, 
serviceCtx),
                 opTrackerProvider.getOperationTracker(serviceCtx, this), 
ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, 
metadataPageManagerFactory, false,
                 serviceCtx.getTracer(), compressorDecompressorFactory, 
nullTypeTraits, nullIntrospector,
-                columnManagerFactory, atomic);
+                columnManagerFactory, atomic, diskCacheService);
     }
 
     public static IJsonSerializable fromJson(IPersistedResourceRegistry 
registry, JsonNode json)
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 039aaedbf3..6335acd21a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -34,6 +34,7 @@ import 
org.apache.hyracks.storage.am.common.api.INullIntrospector;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.CloudColumnIndexDiskCacheManager;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.NoOpColumnIndexDiskCacheManager;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
@@ -55,6 +56,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.util.trace.ITracer;
 
 public class LSMColumnBTreeUtil {
@@ -66,13 +68,18 @@ public class LSMColumnBTreeUtil {
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
             int[] btreeFields, IMetadataPageManagerFactory 
freePageManagerFactory, boolean updateAware, ITracer tracer,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
ITypeTraits nullTypeTraits,
-            INullIntrospector nullIntrospector, IColumnManagerFactory 
columnManagerFactory, boolean atomic)
-            throws HyracksDataException {
+            INullIntrospector nullIntrospector, IColumnManagerFactory 
columnManagerFactory, boolean atomic,
+            IDiskCacheMonitoringService diskCacheService) throws 
HyracksDataException {
+
         // Initialize managers
+        boolean diskCacheEnabled = diskCacheService.isEnabled();
         IColumnManager columnManager = 
columnManagerFactory.createColumnManager();
-        IColumnIndexDiskCacheManager diskCacheManager = 
NoOpColumnIndexDiskCacheManager.INSTANCE;
+        IColumnIndexDiskCacheManager diskCacheManager = diskCacheEnabled
+                ? new 
CloudColumnIndexDiskCacheManager(columnManager.getNumberOfPrimaryKeys(),
+                        columnManager.getMergeColumnProjector(), 
diskCacheService.getPhysicalDrive())
+                : NoOpColumnIndexDiskCacheManager.INSTANCE;
 
-        //Tuple writers
+        // Tuple writers
         LSMBTreeTupleWriterFactory insertTupleWriterFactory = new 
LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false, updateAware, nullTypeTraits, 
nullIntrospector);
         LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new 
LSMBTreeTupleWriterFactory(typeTraits,
@@ -82,7 +89,7 @@ public class LSMColumnBTreeUtil {
         LSMBTreeTupleWriterFactory bulkLoadTupleWriterFactory = new 
LSMBTreeTupleWriterFactory(typeTraits,
                 cmpFactories.length, false, updateAware, nullTypeTraits, 
nullIntrospector);
 
-        //Leaf frames
+        // Leaf frames
         ITreeIndexFrameFactory flushLeafFrameFactory = new 
ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
                 columnManagerFactory.getFlushColumnTupleReaderWriterFactory());
         ITreeIndexFrameFactory mergeLeafFrameFactory = new 
ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
@@ -93,7 +100,7 @@ public class LSMColumnBTreeUtil {
         ITreeIndexFrameFactory deleteLeafFrameFactory = new 
BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
         ITreeIndexFrameFactory interiorFrameFactory = new 
BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
 
-        //BTree factory
+        // BTree factory
         TreeIndexFactory<ColumnBTree> flushBTreeFactory = new 
ColumnBTreeFactory(ioManager, diskBufferCache,
                 freePageManagerFactory, interiorFrameFactory, 
flushLeafFrameFactory, cmpFactories, typeTraits.length);
         TreeIndexFactory<ColumnBTree> mergeBTreeFactory = new 
ColumnBTreeFactory(ioManager, diskBufferCache,
@@ -102,8 +109,8 @@ public class LSMColumnBTreeUtil {
                 new ColumnBTreeFactory(ioManager, diskBufferCache, 
freePageManagerFactory, interiorFrameFactory,
                         bulkLoadLeafFrameFactory, cmpFactories, 
typeTraits.length);
 
-        ILSMIndexFileManager fileNameManager =
-                new LSMBTreeFileManager(ioManager, file, flushBTreeFactory, 
true, compressorDecompressorFactory);
+        ILSMIndexFileManager fileNameManager = new 
LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true,
+                compressorDecompressorFactory, diskCacheEnabled);
 
         BloomFilterFactory bloomFilterFactory = new 
BloomFilterFactory(diskBufferCache, bloomFilterKeyFields);
         ILSMDiskComponentFactory flushComponentFactory =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 98b82db8be..e2e49e3d47 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -45,18 +45,20 @@ public class LSMBTreeFileManager extends 
AbstractLSMIndexFileManager {
             (dir, name) -> !name.startsWith(".") && 
name.endsWith(BTREE_SUFFIX);
     private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
     private final boolean hasBloomFilter;
+    private final boolean allowHoles;
+
+    public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean 
hasBloomFilter) {
+        this(ioManager, file, btreeFactory, hasBloomFilter, 
NoOpCompressorDecompressorFactory.INSTANCE, false);
+    }
 
     public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean 
hasBloomFilter,
-            ICompressorDecompressorFactory compressorDecompressorFactory) {
+            ICompressorDecompressorFactory compressorDecompressorFactory, 
boolean allowHoles) {
         super(ioManager, file, null, compressorDecompressorFactory);
         this.btreeFactory = btreeFactory;
         this.hasBloomFilter = hasBloomFilter;
-    }
-
-    public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean 
hasBloomFilter) {
-        this(ioManager, file, btreeFactory, hasBloomFilter, 
NoOpCompressorDecompressorFactory.INSTANCE);
+        this.allowHoles = allowHoles;
     }
 
     @Override
@@ -180,4 +182,9 @@ public class LSMBTreeFileManager extends 
AbstractLSMIndexFileManager {
 
         return validFiles;
     }
+
+    @Override
+    protected boolean areHolesAllowed() {
+        return allowHoles;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index 3106b2924c..6bd236c191 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -122,7 +122,7 @@ public class LSMBTreeUtil {
             filterManager = new LSMComponentFilterManager(filterFrameFactory);
         }
         ILSMIndexFileManager fileNameManager = new 
LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
-                hasBloomFilter, compressorDecompressorFactory);
+                hasBloomFilter, compressorDecompressorFactory, false);
 
         ILSMDiskComponentFactory componentFactory;
         ILSMDiskComponentFactory bulkLoadComponentFactory;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
index 9d0880e9fa..47492e8dab 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 /**
@@ -59,4 +60,10 @@ public interface IStorageManager extends Serializable, 
IJsonSerializable {
      * @return the resource lifecycle manager
      */
     IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext 
ctx);
+
+    /**
+     * @param ctx the nc service context
+     * @return disk cache monitoring service
+     */
+    IDiskCacheMonitoringService 
getDiskCacheMonitoringService(INCServiceContext ctx);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
index 5fb7088c67..b957782d14 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hyracks.storage.common.disk;
 
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
 /**
  * Disk cache monitoring service is responsible for monitor the local drives
  */
@@ -37,6 +41,13 @@ public interface IDiskCacheMonitoringService {
      */
     boolean isEnabled();
 
+    /**
+     * Report all local resources
+     *
+     * @param localResources local resources
+     */
+    void reportLocalResources(Map<Long, LocalResource> localResources);
+
     /**
      * @return physical drive
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index fc4645d925..1e986b3e47 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -31,34 +31,31 @@ public interface IDiskResourceCacheLockNotifier {
      * Notify registering a new resource
      * Note: this method is not thread-safe outside {@link 
org.apache.hyracks.storage.common.IResourceLifecycleManager}
      *
-     * @param datasetId     dataset ID
      * @param localResource resource to be registered
      * @param index         of the resource
+     * @param partition     partition
      */
-    void onRegister(int datasetId, LocalResource localResource, IIndex index);
+    void onRegister(LocalResource localResource, IIndex index, int partition);
 
     /**
      * Notify unregistering an existing resource
      * Note: this method is not thread-safe outside {@link 
org.apache.hyracks.storage.common.IResourceLifecycleManager}
      *
-     * @param datasetId  dataset ID
      * @param resourceId resource ID
      */
-    void onUnregister(int datasetId, long resourceId);
+    void onUnregister(long resourceId);
 
     /**
      * Notify opening a resource
      *
-     * @param datasetId  dataset ID
      * @param resourceId resource ID
      */
-    void onOpen(int datasetId, long resourceId);
+    void onOpen(long resourceId);
 
     /**
      * Notify closing a resource
      *
-     * @param datasetId  dataset ID
      * @param resourceId resource ID
      */
-    void onClose(int datasetId, long resourceId);
+    void onClose(long resourceId);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
index 9a7a5d693c..d6155a4d0f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hyracks.storage.common.disk;
 
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
 public final class NoOpDiskCacheMonitoringService implements 
IDiskCacheMonitoringService {
     public static final IDiskCacheMonitoringService INSTANCE = new 
NoOpDiskCacheMonitoringService();
 
@@ -39,6 +43,11 @@ public final class NoOpDiskCacheMonitoringService implements 
IDiskCacheMonitorin
         return false;
     }
 
+    @Override
+    public void reportLocalResources(Map<Long, LocalResource> localResources) {
+        // NoOp
+    }
+
     @Override
     public IPhysicalDrive getPhysicalDrive() {
         return DummyPhysicalDrive.INSTANCE;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index 3c0d6c998a..b83c388466 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,22 +28,22 @@ public final class NoOpDiskResourceCacheLockNotifier 
implements IDiskResourceCac
     }
 
     @Override
-    public void onRegister(int datasetId, LocalResource localResource, IIndex 
index) {
+    public void onRegister(LocalResource localResource, IIndex index, int 
partition) {
         // NoOp
     }
 
     @Override
-    public void onUnregister(int datasetId, long resourceId) {
+    public void onUnregister(long resourceId) {
         // NoOp
     }
 
     @Override
-    public void onOpen(int datasetId, long resourceId) {
+    public void onOpen(long resourceId) {
         // NoOp
     }
 
     @Override
-    public void onClose(int datasetId, long resourceId) {
+    public void onClose(long resourceId) {
         // NoOp
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index e54ce2c399..0b4d2ed72b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -26,6 +26,8 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
 
 public class TestStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@ public class TestStorageManager implements IStorageManager {
         return TestStorageManagerComponentHolder.getIndexLifecycleManager();
     }
 
+    @Override
+    public IDiskCacheMonitoringService 
getDiskCacheMonitoringService(INCServiceContext ctx) {
+        return NoOpDiskCacheMonitoringService.INSTANCE;
+    }
+
 }

Reply via email to