This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f406f2a2a4 [ASTERIXDB-3184][STO] Unify I/O APIs
f406f2a2a4 is described below

commit f406f2a2a4659fa5e70c7d112692bb055080d5a3
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue May 16 16:38:07 2023 -0700

    [ASTERIXDB-3184][STO] Unify I/O APIs
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Unify file operations by using IIOManager
    
    Change-Id: Ifb77c24ee855537bcf725b2eb1e28b1af200f3ae
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17524
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../asterix/app/nc/IndexCheckpointManager.java     |  62 ++---
 .../app/nc/IndexCheckpointManagerProvider.java     |   6 +-
 .../org/apache/asterix/cloud/CloudIOManager.java   |  12 +-
 .../ioopcallbacks/LSMIOOperationCallback.java      |  12 +-
 .../asterix/common/utils/StoragePathUtil.java      |   6 +-
 .../messaging/CheckpointPartitionIndexesTask.java  |   9 +-
 .../replication/messaging/ComponentMaskTask.java   |  16 +-
 .../replication/messaging/DeleteFileTask.java      |   9 +-
 .../replication/messaging/DropIndexTask.java       |  11 +-
 .../messaging/MarkComponentValidTask.java          |   9 +-
 .../replication/messaging/ReplicateFileTask.java   |  33 ++-
 .../PersistentLocalResourceRepository.java         | 255 ++++++++++-----------
 .../org/apache/hyracks/api/io/FileReference.java   |   6 +-
 .../java/org/apache/hyracks/api/io/IIOManager.java |   7 +
 .../apache/hyracks/control/nc/io/FileHandle.java   |   8 +
 .../apache/hyracks/control/nc/io/IOManager.java    |  23 +-
 .../am/lsm/btree/impls/LSMBTreeFileManager.java    |  10 +-
 .../common/impls/AbstractLSMIndexFileManager.java  |  40 +---
 18 files changed, 268 insertions(+), 266 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 290734f046..2ed163812e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -18,15 +18,11 @@
  */
 package org.apache.asterix.app.nc;
 
-import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.channels.ClosedByInterruptException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
@@ -34,9 +30,11 @@ import 
org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IndexCheckpoint;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -49,10 +47,12 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
     private static final FilenameFilter CHECKPOINT_FILE_FILTER =
             (file, name) -> 
name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
     private static final long BULKLOAD_LSN = 0;
-    private final Path indexPath;
+    private final FileReference indexPath;
+    private final IIOManager ioManager;
 
-    public IndexCheckpointManager(Path indexPath) {
+    public IndexCheckpointManager(FileReference indexPath, IIOManager 
ioManager) {
         this.indexPath = indexPath;
+        this.ioManager = ioManager;
     }
 
     @Override
@@ -154,7 +154,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         }
         if (checkpoints.isEmpty()) {
             LOGGER.warn("Couldn't find any checkpoint file for index {}. 
Content of dir are {}.", indexPath,
-                    Arrays.toString(indexPath.toFile().listFiles()));
+                    ioManager.getMatchingFiles(indexPath, 
IoUtil.NO_OP_FILTER).toString());
             throw new IllegalStateException("Couldn't find any checkpoints for 
resource: " + indexPath);
         }
         
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -180,13 +180,13 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         }
     }
 
-    private List<IndexCheckpoint> getCheckpoints() throws 
ClosedByInterruptException {
+    private List<IndexCheckpoint> getCheckpoints() throws 
ClosedByInterruptException, HyracksDataException {
         List<IndexCheckpoint> checkpoints = new ArrayList<>();
-        final File[] checkpointFiles = 
indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
-        if (checkpointFiles != null) {
-            for (File checkpointFile : checkpointFiles) {
+        final Collection<FileReference> checkpointFiles = 
ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+        if (!checkpointFiles.isEmpty()) {
+            for (FileReference checkpointFile : checkpointFiles) {
                 try {
-                    checkpoints.add(read(checkpointFile.toPath()));
+                    checkpoints.add(read(checkpointFile));
                 } catch (ClosedByInterruptException e) {
                     throw e;
                 } catch (IOException e) {
@@ -198,14 +198,14 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
     }
 
     private void persist(IndexCheckpoint checkpoint) throws 
HyracksDataException {
-        final Path checkpointPath = getCheckpointPath(checkpoint);
+        final FileReference checkpointPath = getCheckpointPath(checkpoint);
         for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
             try {
                 // clean up from previous write failure
-                if (checkpointPath.toFile().exists()) {
-                    Files.delete(checkpointPath);
+                if (ioManager.exists(checkpointPath)) {
+                    ioManager.delete(checkpointPath);
                 }
-                FileUtil.writeAndForce(checkpointPath, 
checkpoint.asJson().getBytes());
+                ioManager.overwrite(checkpointPath, 
checkpoint.asJson().getBytes());
                 // ensure it was written correctly by reading it
                 read(checkpointPath);
                 return;
@@ -223,17 +223,18 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         }
     }
 
-    private IndexCheckpoint read(Path checkpointPath) throws IOException {
-        return IndexCheckpoint.fromJson(new 
String(Files.readAllBytes(checkpointPath)));
+    private IndexCheckpoint read(FileReference checkpointPath) throws 
IOException {
+        return IndexCheckpoint.fromJson(new 
String(ioManager.readAllBytes(checkpointPath)));
     }
 
     private void deleteHistory(long latestId, int historyToKeep) {
         try {
-            final File[] checkpointFiles = 
indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
-            if (checkpointFiles != null) {
-                for (File checkpointFile : checkpointFiles) {
-                    if (getCheckpointIdFromFileName(checkpointFile.toPath()) < 
(latestId - historyToKeep)) {
-                        Files.delete(checkpointFile.toPath());
+            final Collection<FileReference> checkpointFiles =
+                    ioManager.getMatchingFiles(indexPath, 
CHECKPOINT_FILE_FILTER);
+            if (!checkpointFiles.isEmpty()) {
+                for (FileReference checkpointFile : checkpointFiles) {
+                    if (getCheckpointIdFromFileName(checkpointFile) < 
(latestId - historyToKeep)) {
+                        ioManager.delete(checkpointFile);
                     }
                 }
             }
@@ -242,13 +243,12 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         }
     }
 
-    private Path getCheckpointPath(IndexCheckpoint checkpoint) {
-        return Paths.get(indexPath.toString(),
-                StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + 
String.valueOf(checkpoint.getId()));
+    private FileReference getCheckpointPath(IndexCheckpoint checkpoint) {
+        return 
indexPath.getChild(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + 
checkpoint.getId());
     }
 
-    private long getCheckpointIdFromFileName(Path checkpointPath) {
-        return Long.valueOf(checkpointPath.getFileName().toString()
-                
.substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+    private long getCheckpointIdFromFileName(FileReference checkpointPath) {
+        return Long
+                
.parseLong(checkpointPath.getName().substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
index e0b3105a80..1e08ed87d4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.app.nc;
 
-import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -27,6 +26,7 @@ import 
org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 
 public class IndexCheckpointManagerProvider implements 
IIndexCheckpointManagerProvider {
@@ -54,8 +54,8 @@ public class IndexCheckpointManagerProvider implements 
IIndexCheckpointManagerPr
 
     private IndexCheckpointManager create(ResourceReference ref) {
         try {
-            final Path indexPath = StoragePathUtil.getIndexPath(ioManager, 
ref);
-            return new IndexCheckpointManager(indexPath);
+            final FileReference indexPath = 
StoragePathUtil.getIndexPath(ioManager, ref);
+            return new IndexCheckpointManager(indexPath, ioManager);
         } catch (HyracksDataException e) {
             throw new IllegalStateException(e);
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index 6f4ce696ed..97d8d80889 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -18,7 +18,8 @@
  */
 package org.apache.asterix.cloud;
 
-import static org.apache.asterix.common.utils.StorageConstants.*;
+import static 
org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static 
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -151,6 +152,7 @@ public class CloudIOManager extends IOManager {
         super.close(fHandle);
     }
 
+    // TODO This method should not do any syncing. It simply should list the 
files
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
         Set<String> cloudFiles = cloudClient.listObjects(bucket, 
dir.getRelativePath(), filter);
@@ -222,6 +224,14 @@ public class CloudIOManager extends IOManager {
         return super.getSize(fileHandle);
     }
 
+    @Override
+    public long getSize(FileReference fileReference) {
+        if (!fileReference.getFile().exists()) {
+            return cloudClient.getObjectSize(bucket, 
fileReference.getRelativePath());
+        }
+        return super.getSize(fileReference);
+    }
+
     @Override
     public void overwrite(FileReference fileRef, byte[] bytes) throws 
ClosedByInterruptException, HyracksDataException {
         super.overwrite(fileRef, bytes);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51960..999636da13 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -21,8 +21,6 @@ package org.apache.asterix.common.ioopcallbacks;
 
 import static 
org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
@@ -77,7 +75,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
-    private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
+    private final Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
 
     public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, 
ILSMComponentId componentId,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
@@ -307,10 +305,8 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
 
     private static FileReference getOperationMaskFilePath(ILSMIOOperation 
operation) {
         FileReference target = operation.getTarget();
-        final String componentSequence = 
getComponentSequence(target.getFile().getAbsolutePath());
-        Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
-        Path maskFileRelPath =
-                Paths.get(idxRelPath.toString(), 
StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
-        return new FileReference(target.getDeviceHandle(), 
maskFileRelPath.toString());
+        String componentSequence = 
getComponentSequence(target.getFile().getAbsolutePath());
+        FileReference idxRelPath = target.getParent();
+        return idxRelPath.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX 
+ componentSequence);
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index a9ed066009..9702b18491 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.common.utils;
 
 import java.io.File;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Iterator;
 
@@ -31,6 +30,7 @@ import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConst
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.MappedFileSplit;
@@ -180,7 +180,7 @@ public class StoragePathUtil {
      * @return
      * @throws HyracksDataException
      */
-    public static Path getIndexPath(IIOManager ioManager, ResourceReference 
ref) throws HyracksDataException {
-        return 
ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+    public static FileReference getIndexPath(IIOManager ioManager, 
ResourceReference ref) throws HyracksDataException {
+        return ioManager.resolve(ref.getRelativePath().toString());
     }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 97b6556040..a6ba0e2916 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -24,7 +24,6 @@ import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Path;
 import java.util.Collection;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -36,6 +35,7 @@ import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
@@ -69,14 +69,15 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
             DatasetResourceReference ref = DatasetResourceReference.of(ls);
             final IIndexCheckpointManager indexCheckpointManager = 
indexCheckpointManagerProvider.get(ref);
             // Get most recent sequence of existing files to avoid deletion
-            Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
-            String[] files = 
indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+            FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, 
ref);
+            Collection<FileReference> files =
+                    ioManager.getMatchingFiles(indexPath, 
AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
             if (files == null) {
                 throw HyracksDataException
                         .create(new IOException(indexPath + " is not a 
directory or an IO Error occurred"));
             }
             long maxComponentSequence = UNINITIALIZED_COMPONENT_SEQ;
-            for (String file : files) {
+            for (FileReference file : files) {
                 maxComponentSequence =
                         Math.max(maxComponentSequence, 
IndexComponentFileReference.of(file).getSequenceEnd());
             }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 3f04bd2c5c..e9af85c204 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -22,9 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -50,21 +47,22 @@ public class ComponentMaskTask implements IReplicaTask {
     @Override
     public void perform(INcApplicationContext appCtx, IReplicationWorker 
worker) {
         try {
+            IIOManager ioManager = appCtx.getIoManager();
             // create mask
-            final Path maskPath = getComponentMaskPath(appCtx, file);
-            Files.createFile(maskPath);
+            final FileReference maskPath = getComponentMaskPath(ioManager, 
file);
+            ioManager.create(maskPath);
             ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
         } catch (IOException e) {
             throw new ReplicationException(e);
         }
     }
 
-    public static Path getComponentMaskPath(INcApplicationContext appCtx, 
String componentFile) throws IOException {
-        final IIOManager ioManager = appCtx.getIoManager();
+    public static FileReference getComponentMaskPath(IIOManager ioManager, 
String componentFile) throws IOException {
         final FileReference localPath = ioManager.resolve(componentFile);
-        final Path resourceDir = 
Files.createDirectories(localPath.getFile().getParentFile().toPath());
+        final FileReference resourceDir = localPath.getParent();
+        ioManager.makeDirectories(resourceDir);
         final String componentSequence = 
ResourceReference.getComponentSequence(componentFile);
-        return Paths.get(resourceDir.toString(), 
StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+        return 
resourceDir.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX + 
componentSequence);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 92e4989b40..a00acfbdea 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -20,10 +20,8 @@ package org.apache.asterix.replication.messaging;
 
 import java.io.DataInput;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -32,6 +30,7 @@ import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -52,9 +51,9 @@ public class DeleteFileTask implements IReplicaTask {
     public void perform(INcApplicationContext appCtx, IReplicationWorker 
worker) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
-            final File localFile = ioManager.resolve(file).getFile();
-            if (localFile.exists()) {
-                Files.delete(localFile.toPath());
+            final FileReference localFile = ioManager.resolve(file);
+            if (ioManager.exists(localFile)) {
+                ioManager.delete(localFile);
                 ResourceReference replicaRes = 
ResourceReference.of(localFile.getAbsolutePath());
                 if (replicaRes.isMetadataResource()) {
                     ((PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository())
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index 483561b777..2b0e2d8cf5 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -20,7 +20,6 @@ package org.apache.asterix.replication.messaging;
 
 import java.io.DataInput;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -31,8 +30,8 @@ import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,10 +51,10 @@ public class DropIndexTask implements IReplicaTask {
     public void perform(INcApplicationContext appCtx, IReplicationWorker 
worker) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
-            final File indexFile = ioManager.resolve(file).getFile();
-            if (indexFile.exists()) {
-                File indexDir = indexFile.getParentFile();
-                IoUtil.delete(indexDir);
+            final FileReference indexFile = ioManager.resolve(file);
+            if (ioManager.exists(indexFile)) {
+                FileReference indexDir = indexFile.getParent();
+                ioManager.deleteDirectory(indexDir);
                 ((PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository())
                         
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
                 LOGGER.info(() -> "Deleted index: " + 
indexFile.getAbsolutePath());
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index fa77378619..76fde09f7b 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -22,8 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -36,6 +34,8 @@ import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.logging.log4j.LogManager;
@@ -67,9 +67,10 @@ public class MarkComponentValidTask implements IReplicaTask {
             } else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
                 ensureComponentLsnFlushed(appCtx);
             }
+            IIOManager ioManager = appCtx.getIoManager();
             // delete mask
-            final Path maskPath = 
ComponentMaskTask.getComponentMaskPath(appCtx, file);
-            Files.delete(maskPath);
+            final FileReference maskPath = 
ComponentMaskTask.getComponentMaskPath(ioManager, file);
+            ioManager.delete(maskPath);
             ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
         } catch (IOException | InterruptedException e) {
             throw new ReplicationException(e);
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 500a5def24..71ed63ef9d 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -24,11 +24,7 @@ import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -40,10 +36,10 @@ import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -73,31 +69,34 @@ public class ReplicateFileTask implements IReplicaTask {
             final IIOManager ioManager = appCtx.getIoManager();
             // resolve path
             final FileReference localPath = ioManager.resolve(file);
-            final Path resourceDir = 
Files.createDirectories(localPath.getFile().getParentFile().toPath());
+            FileReference resourceDir = localPath.getParent();
+            ioManager.makeDirectories(resourceDir);
             if (indexMetadata) {
                 // ensure clean index directory
-                FileUtils.cleanDirectory(resourceDir.toFile());
+                ioManager.cleanDirectory(resourceDir);
                 ((PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository())
                         
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
             }
             // create mask
-            final Path maskPath = Paths.get(resourceDir.toString(),
-                    StorageConstants.MASK_FILE_PREFIX + 
localPath.getFile().getName());
-            Files.createFile(maskPath);
+            final FileReference maskPath =
+                    resourceDir.getChild(StorageConstants.MASK_FILE_PREFIX + 
localPath.getName());
+            ioManager.create(maskPath);
             // receive actual file
-            final Path filePath = Paths.get(resourceDir.toString(), 
localPath.getFile().getName());
-            Files.createFile(filePath);
-            try (RandomAccessFile fileOutputStream = new 
RandomAccessFile(filePath.toFile(), "rw");
-                    FileChannel fileChannel = fileOutputStream.getChannel()) {
-                fileOutputStream.setLength(size);
+            ioManager.create(localPath);
+            FileHandle fileHandle = (FileHandle) ioManager.open(localPath, 
IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            try (FileChannel fileChannel = fileHandle.getFileChannel()) {
+                fileHandle.setLength(size);
                 NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
-                fileChannel.force(true);
+                ioManager.sync(fileHandle, true);
+            } finally {
+                ioManager.close(fileHandle);
             }
             if (indexMetadata) {
                 initIndexCheckpoint(appCtx);
             }
             //delete mask
-            Files.delete(maskPath);
+            ioManager.delete(maskPath);
             LOGGER.debug("received file {} from master", localPath);
             ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
         } catch (IOException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1e813462b3..8ef55e89f2 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.resource;
 import static 
org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
 import static 
org.apache.asterix.common.utils.StorageConstants.INDEX_NON_DATA_FILES_PREFIX;
 import static 
org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
+import static 
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 import static 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
 import static 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
@@ -28,12 +29,8 @@ import static 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFil
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,7 +41,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -59,7 +55,6 @@ import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.storage.ResourceStorageStats;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -68,14 +63,12 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -88,18 +81,20 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private static final String METADATA_FILE_MASK_NAME =
             StorageConstants.MASK_FILE_PREFIX + 
StorageConstants.METADATA_FILE_NAME;
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
             (dir, name) -> name.startsWith(METADATA_FILE_NAME) || 
!name.startsWith(INDEX_NON_DATA_FILES_PREFIX);
     private static final FilenameFilter MASK_FILES_FILTER =
             (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
-    private static final int MAX_CACHED_RESOURCES = 1000;
     private static final FilenameFilter METADATA_FILES_FILTER =
             (dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
     private static final FilenameFilter METADATA_MASK_FILES_FILTER =
             (dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
 
+    private static final int MAX_CACHED_RESOURCES = 1000;
+
     // Finals
     private final IIOManager ioManager;
     private final Cache<String, LocalResource> resourceCache;
@@ -107,7 +102,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
-    private final List<Path> storageRoots;
+    private final List<FileReference> storageRoots;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
 
@@ -120,8 +115,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         storageRoots = new ArrayList<>();
         final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
         for (int i = 0; i < ioDevices.size(); i++) {
-            storageRoots.add(
-                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), 
StorageConstants.STORAGE_ROOT_DIR_NAME));
+            storageRoots.add(new FileReference(ioDevices.get(i), 
STORAGE_ROOT_DIR_NAME));
         }
         createStorageRoots();
         resourceCache = 
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -131,7 +125,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public String toString() {
         StringBuilder aString = new 
StringBuilder().append(PersistentLocalResourceRepository.class.getSimpleName())
                 
.append(Character.LINE_SEPARATOR).append(ioManager.getClass().getSimpleName()).append(':')
-                
.append(Character.LINE_SEPARATOR).append(ioManager.toString()).append(Character.LINE_SEPARATOR)
+                
.append(Character.LINE_SEPARATOR).append(ioManager).append(Character.LINE_SEPARATOR)
                 .append("Cached Resources:").append(Character.LINE_SEPARATOR);
         resourceCache.asMap().forEach(
                 (key, value) -> 
aString.append(key).append("->").append(value).append(Character.LINE_SEPARATOR));
@@ -143,8 +137,8 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         LocalResource resource = resourceCache.getIfPresent(relativePath);
         if (resource == null) {
             FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
-            if (resourceFile.getFile().exists()) {
-                resource = readLocalResource(resourceFile.getFile());
+            resource = readLocalResource(resourceFile);
+            if (resource != null) {
                 resourceCache.put(relativePath, resource);
             }
         }
@@ -162,15 +156,15 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
                 throw new HyracksDataException("Duplicate resource: " + 
resourceFile.getAbsolutePath());
             }
 
-            final File parent = resourceFile.getFile().getParentFile();
-            if (!parent.exists() && !parent.mkdirs()) {
+            final FileReference parent = resourceFile.getParent();
+            if (!ioManager.exists(parent) && 
!ioManager.makeDirectories(parent)) {
                 throw HyracksDataException.create(CANNOT_CREATE_FILE, 
parent.getAbsolutePath());
             }
             // The next block should be all or nothing
             try {
                 createResourceFileMask(resourceFile);
                 byte[] bytes = 
OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
-                
FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
+                ioManager.overwrite(resourceFile, bytes);
                 
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(
                         UNINITIALIZED_COMPONENT_SEQ, 0, 
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
                 deleteResourceFileMask(resourceFile);
@@ -199,7 +193,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     private void cleanup(FileReference resourceFile) {
         if (resourceFile.getFile().exists()) {
             try {
-                IoUtil.delete(resourceFile);
+                ioManager.delete(resourceFile);
             } catch (Throwable th) {
                 LOGGER.error("Error cleaning up corrupted resource {}", 
resourceFile, th);
                 
ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
@@ -210,7 +204,9 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     @Override
     public void delete(String relativePath) throws HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
-        boolean resourceExists = resourceFile.getFile().exists();
+        final LocalResource localResource = readLocalResource(resourceFile);
+
+        boolean resourceExists = localResource != null;
         if (isReplicationEnabled && resourceExists) {
             try {
                 createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
@@ -221,8 +217,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         synchronized (this) {
             try {
                 if (resourceExists) {
-                    final LocalResource localResource = 
readLocalResource(resourceFile.getFile());
-                    IoUtil.delete(resourceFile);
+                    ioManager.delete(resourceFile);
                     // delete all checkpoints
                     
indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
                 } else {
@@ -241,18 +236,15 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return ioManager.resolve(fileName);
     }
 
-    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter, List<Path> roots)
-            throws HyracksDataException {
+    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter,
+            List<FileReference> roots) throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
-        for (Path root : roots) {
-            if (!Files.exists(root) || !Files.isDirectory(root)) {
-                continue;
-            }
-            final Collection<File> files = IoUtil.getMatchingFiles(root, 
METADATA_FILES_FILTER);
+        for (FileReference root : roots) {
+            final Collection<FileReference> files = 
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
             try {
-                for (File file : files) {
+                for (FileReference file : files) {
                     final LocalResource localResource = 
readLocalResource(file);
-                    if (filter.test(localResource)) {
+                    if (localResource != null && filter.test(localResource)) {
                         resourcesMap.put(localResource.getId(), localResource);
                     }
                 }
@@ -270,7 +262,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
             throws HyracksDataException {
-        List<Path> partitionsRoots = new ArrayList<>();
+        List<FileReference> partitionsRoots = new ArrayList<>();
         for (Integer partition : partitions) {
             partitionsRoots.add(getPartitionRoot(partition));
         }
@@ -278,14 +270,15 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> 
filter) throws HyracksDataException {
-        for (Path root : storageRoots) {
-            final Collection<File> files = IoUtil.getMatchingFiles(root, 
METADATA_FILES_FILTER);
+        for (FileReference root : storageRoots) {
+            final Collection<FileReference> files = 
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
             try {
-                for (File file : files) {
+                for (FileReference file : files) {
                     final LocalResource localResource = 
readLocalResource(file);
-                    if (filter.test(localResource)) {
-                        LOGGER.warn("deleting invalid metadata index {}", 
file.getParentFile());
-                        IoUtil.delete(file.getParentFile());
+                    if (localResource != null && filter.test(localResource)) {
+                        FileReference parent = file.getParent();
+                        LOGGER.warn("deleting invalid metadata index {}", 
parent);
+                        ioManager.delete(parent);
                     }
                 }
             } catch (IOException e) {
@@ -319,10 +312,14 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
                 : (path + File.separator + 
StorageConstants.METADATA_FILE_NAME);
     }
 
-    private LocalResource readLocalResource(File file) throws 
HyracksDataException {
-        final Path path = Paths.get(file.getAbsolutePath());
+    private LocalResource readLocalResource(FileReference fileRef) throws 
HyracksDataException {
+        byte[] bytes = ioManager.readAllBytes(fileRef);
+        if (bytes == null) {
+            return null;
+        }
+
         try {
-            final JsonNode jsonNode = 
OBJECT_MAPPER.readValue(Files.readAllBytes(path), JsonNode.class);
+            final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, 
JsonNode.class);
             LocalResource resource = (LocalResource) 
persistedResourceRegistry.deserialize(jsonNode);
             if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
                 return resource;
@@ -358,15 +355,10 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     /**
      * Deletes physical files of all data verses.
-     *
-     * @throws IOException
      */
-    public synchronized void deleteStorageData() throws IOException {
-        for (Path root : storageRoots) {
-            final File rootFile = root.toFile();
-            if (rootFile.exists()) {
-                FileUtils.deleteDirectory(rootFile);
-            }
+    public synchronized void deleteStorageData() throws HyracksDataException {
+        for (FileReference root : storageRoots) {
+            ioManager.deleteDirectory(root);
         }
         createStorageRoots();
     }
@@ -392,15 +384,15 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
      * @return The set of indexes files
      * @throws HyracksDataException
      */
-    public synchronized Set<File> getPartitionIndexes(int partition) throws 
HyracksDataException {
-        Path partitionRoot = getPartitionRoot(partition);
+    public synchronized Set<FileReference> getPartitionIndexes(int partition) 
throws HyracksDataException {
+        FileReference partitionRoot = getPartitionRoot(partition);
         final Map<Long, LocalResource> partitionResourcesMap = 
getResources(resource -> {
             DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
             return dsResource.getPartition() == partition;
         }, Collections.singletonList(partitionRoot));
-        Set<File> indexes = new HashSet<>();
+        Set<FileReference> indexes = new HashSet<>();
         for (LocalResource localResource : partitionResourcesMap.values()) {
-            indexes.add(ioManager.resolve(localResource.getPath()).getFile());
+            indexes.add(ioManager.resolve(localResource.getPath()));
         }
         return indexes;
     }
@@ -426,15 +418,15 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public synchronized List<String> getPartitionReplicatedFiles(int 
partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final List<String> partitionReplicatedFiles = new ArrayList<>();
-        final Set<File> replicatedIndexes = new HashSet<>();
+        final Set<FileReference> replicatedIndexes = new HashSet<>();
         final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
         for (LocalResource lr : partitionResources.values()) {
             DatasetLocalResource datasetLocalResource = (DatasetLocalResource) 
lr.getResource();
             if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
-                
replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
+                replicatedIndexes.add(ioManager.resolve(lr.getPath()));
             }
         }
-        for (File indexDir : replicatedIndexes) {
+        for (FileReference indexDir : replicatedIndexes) {
             partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
         }
         return partitionReplicatedFiles;
@@ -455,31 +447,23 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return maxComponentId;
     }
 
-    private List<String> getIndexFiles(File indexDir) {
+    private List<String> getIndexFiles(FileReference indexDir) throws 
HyracksDataException {
         final List<String> indexFiles = new ArrayList<>();
-        if (indexDir.isDirectory()) {
-            File[] indexFilteredFiles = 
indexDir.listFiles(LSM_INDEX_FILES_FILTER);
-            if (indexFilteredFiles != null) {
-                
Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
-            }
-        }
+        Collection<FileReference> indexFilteredFiles = 
ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+        
indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
         return indexFiles;
     }
 
     private void createStorageRoots() {
-        for (Path root : storageRoots) {
-            try {
-                Files.createDirectories(root);
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to create storage root 
directory at " + root, e);
-            }
+        for (FileReference root : storageRoots) {
+            ioManager.makeDirectories(root);
         }
     }
 
     public synchronized void cleanup(int partition) throws 
HyracksDataException {
-        final Set<File> partitionIndexes = getPartitionIndexes(partition);
+        final Set<FileReference> partitionIndexes = 
getPartitionIndexes(partition);
         try {
-            for (File index : partitionIndexes) {
+            for (FileReference index : partitionIndexes) {
                 deleteIndexMaskedFiles(index);
                 if (isValidIndex(index)) {
                     deleteIndexInvalidComponents(index);
@@ -504,30 +488,27 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     public synchronized void deleteCorruptedResources() throws 
HyracksDataException {
-        for (Path root : storageRoots) {
-            final Collection<File> metadataMaskFiles = 
IoUtil.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
-            for (File metadataMaskFile : metadataMaskFiles) {
-                final File resourceFile = new 
File(metadataMaskFile.getParent(), METADATA_FILE_NAME);
-                if (resourceFile.exists()) {
-                    IoUtil.delete(resourceFile);
-                }
-                IoUtil.delete(metadataMaskFile);
+        for (FileReference root : storageRoots) {
+            final Collection<FileReference> metadataMaskFiles =
+                    ioManager.getMatchingFiles(root, 
METADATA_MASK_FILES_FILTER);
+            for (FileReference metadataMaskFile : metadataMaskFiles) {
+                final FileReference resourceFile = 
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+                ioManager.delete(resourceFile);
+                ioManager.delete(metadataMaskFile);
             }
         }
     }
 
-    private void deleteIndexMaskedFiles(File index) throws IOException {
-        File[] masks = index.listFiles(MASK_FILES_FILTER);
-        if (masks != null) {
-            for (File mask : masks) {
-                deleteIndexMaskedFiles(index, mask);
-                // delete the mask itself
-                Files.delete(mask.toPath());
-            }
+    private void deleteIndexMaskedFiles(FileReference index) throws 
IOException {
+        Collection<FileReference> masks = ioManager.getMatchingFiles(index, 
MASK_FILES_FILTER);
+        for (FileReference mask : masks) {
+            deleteIndexMaskedFiles(index, mask);
+            // delete the mask itself
+            ioManager.delete(mask);
         }
     }
 
-    private boolean isValidIndex(File index) throws IOException {
+    private boolean isValidIndex(FileReference index) throws IOException {
         // any index without any checkpoint files is invalid
         // this can happen if a crash happens when the index metadata file is 
created
         // but before the initial checkpoint is persisted. The index metadata 
file will
@@ -535,46 +516,46 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return getIndexCheckpointManager(index).getCheckpointCount() != 0;
     }
 
-    private void deleteIndexInvalidComponents(File index) throws IOException, 
ParseException {
-        final File[] indexComponentFiles = 
index.listFiles(COMPONENT_FILES_FILTER);
+    private void deleteIndexInvalidComponents(FileReference index) throws 
IOException, ParseException {
+        final Collection<FileReference> indexComponentFiles = 
ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
         if (indexComponentFiles == null) {
             throw new IOException(index + " doesn't exist or an IO error 
occurred");
         }
         final long validComponentSequence = 
getIndexCheckpointManager(index).getValidComponentSequence();
-        for (File componentFile : indexComponentFiles) {
+        for (FileReference componentFileRef : indexComponentFiles) {
             // delete any file with start or end sequence > valid component 
sequence
-            final long fileStart = 
IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
-            final long fileEnd = 
IndexComponentFileReference.of(componentFile.getName()).getSequenceEnd();
+            final long fileStart = 
IndexComponentFileReference.of(componentFileRef.getName()).getSequenceStart();
+            final long fileEnd = 
IndexComponentFileReference.of(componentFileRef.getName()).getSequenceEnd();
             if (fileStart > validComponentSequence || fileEnd > 
validComponentSequence) {
-                LOGGER.warn(() -> "Deleting invalid component file " + 
componentFile.getAbsolutePath()
+                LOGGER.warn(() -> "Deleting invalid component file " + 
componentFileRef.getAbsolutePath()
                         + " based on valid sequence " + 
validComponentSequence);
-                Files.delete(componentFile.toPath());
+                ioManager.delete(componentFileRef);
             }
         }
     }
 
-    private IIndexCheckpointManager getIndexCheckpointManager(File index) 
throws HyracksDataException {
-        final String indexFile = Paths.get(index.getAbsolutePath(), 
StorageConstants.METADATA_FILE_NAME).toString();
+    private IIndexCheckpointManager getIndexCheckpointManager(FileReference 
index) throws HyracksDataException {
+        final String indexFile = 
index.getChild(METADATA_FILE_NAME).getAbsolutePath();
         final ResourceReference indexRef = ResourceReference.of(indexFile);
         return indexCheckpointManagerProvider.get(indexRef);
     }
 
-    private void deleteIndexMaskedFiles(File index, File mask) throws 
IOException {
-        if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
+    private void deleteIndexMaskedFiles(FileReference index, FileReference 
mask) throws IOException {
+        if 
(!mask.getFile().getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
             throw new IllegalArgumentException("Unrecognized mask file: " + 
mask);
         }
-        File[] maskedFiles;
+        Collection<FileReference> maskedFiles;
         if (isComponentMask(mask)) {
             final String componentId = 
mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
-            maskedFiles = index.listFiles((dir, name) -> 
name.startsWith(componentId));
+            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> 
name.startsWith(componentId));
         } else {
             final String maskedFileName = 
mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
-            maskedFiles = index.listFiles((dir, name) -> 
name.equals(maskedFileName));
+            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> 
name.equals(maskedFileName));
         }
         if (maskedFiles != null) {
-            for (File maskedFile : maskedFiles) {
+            for (FileReference maskedFile : maskedFiles) {
                 LOGGER.info(() -> "deleting masked file: " + 
maskedFile.getAbsolutePath());
-                Files.delete(maskedFile.toPath());
+                ioManager.delete(maskedFile);
             }
         }
     }
@@ -583,13 +564,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         try {
             final FileReference resolvedPath = 
ioManager.resolve(resource.getRelativePath().toString());
             long totalSize = 0;
-            final File[] indexFiles = resolvedPath.getFile().listFiles();
+            final Collection<FileReference> indexFiles = 
ioManager.list(resolvedPath);
             final Map<String, Long> componentsStats = new HashMap<>();
             if (indexFiles != null) {
-                for (File file : indexFiles) {
-                    long fileSize = file.length();
+                for (FileReference file : indexFiles) {
+                    long fileSize = ioManager.getSize(file);
                     totalSize += fileSize;
-                    if (isComponentFile(resolvedPath.getFile(), 
file.getName())) {
+                    if (isComponentFile(resolvedPath, file.getName())) {
                         String componentSeq = 
getComponentSequence(file.getAbsolutePath());
                         componentsStats.put(componentSeq, 
componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
                     }
@@ -621,73 +602,69 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     private void createResourceFileMask(FileReference resourceFile) throws 
HyracksDataException {
-        Path maskFile = getResourceMaskFilePath(resourceFile);
+        FileReference maskFile = getResourceMaskFilePath(resourceFile);
         try {
-            Files.createFile(maskFile);
+            ioManager.create(maskFile);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }
 
     private void deleteResourceFileMask(FileReference resourceFile) throws 
HyracksDataException {
-        Path maskFile = getResourceMaskFilePath(resourceFile);
-        IoUtil.delete(maskFile);
+        FileReference maskFile = getResourceMaskFilePath(resourceFile);
+        ioManager.delete(maskFile);
     }
 
-    private Path getResourceMaskFilePath(FileReference resourceFile) {
-        return 
Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), 
METADATA_FILE_MASK_NAME);
+    private FileReference getResourceMaskFilePath(FileReference resourceFile) {
+        FileReference resourceFileParent = resourceFile.getParent();
+        return resourceFileParent.getChild(METADATA_FILE_MASK_NAME);
     }
 
-    private static boolean isComponentMask(File mask) {
+    private static boolean isComponentMask(FileReference mask) {
         return 
mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
     }
 
-    private static boolean isComponentFile(File indexDir, String fileName) {
-        return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
-    }
-
-    public List<Path> getStorageRoots() {
-        return storageRoots;
+    private static boolean isComponentFile(FileReference indexDir, String 
fileName) {
+        return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
     }
 
-    public synchronized void keepPartitions(Set<Integer> keepPartitions) {
-        List<File> onDiskPartitions = getOnDiskPartitions();
-        for (File onDiskPartition : onDiskPartitions) {
+    public synchronized void keepPartitions(Set<Integer> keepPartitions) 
throws HyracksDataException {
+        List<FileReference> onDiskPartitions = getOnDiskPartitions();
+        for (FileReference onDiskPartition : onDiskPartitions) {
             int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (!keepPartitions.contains(partitionNum)) {
                 LOGGER.warn("deleting partition {} since it is not on 
partitions to keep {}", partitionNum,
                         keepPartitions);
-                FileUtils.deleteQuietly(onDiskPartition);
+                ioManager.delete(onDiskPartition);
             }
         }
     }
 
-    public synchronized List<File> getOnDiskPartitions() {
-        List<File> onDiskPartitions = new ArrayList<>();
-        for (Path root : storageRoots) {
-            File[] partitions = root.toFile().listFiles(
+    public synchronized List<FileReference> getOnDiskPartitions() throws 
HyracksDataException {
+        List<FileReference> onDiskPartitions = new ArrayList<>();
+        for (FileReference root : storageRoots) {
+            Collection<FileReference> partitions = ioManager.list(root,
                     (dir, name) -> dir.isDirectory() && 
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
             if (partitions != null) {
-                onDiskPartitions.addAll(Arrays.asList(partitions));
+                onDiskPartitions.addAll(partitions);
             }
         }
         return onDiskPartitions;
     }
 
-    public Path getPartitionRoot(int partition) throws HyracksDataException {
-        Path path =
-                Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, 
StorageConstants.PARTITION_DIR_PREFIX + partition);
-        FileReference resolve = ioManager.resolve(path.toString());
-        return resolve.getFile().toPath();
+    public FileReference getPartitionRoot(int partition) throws 
HyracksDataException {
+        String path = StorageConstants.STORAGE_ROOT_DIR_NAME + File.separator 
+ StorageConstants.PARTITION_DIR_PREFIX
+                + partition;
+        return ioManager.resolve(path);
     }
 
-    public void deletePartition(int partitionId) {
-        List<File> onDiskPartitions = getOnDiskPartitions();
-        for (File onDiskPartition : onDiskPartitions) {
+    public void deletePartition(int partitionId) throws HyracksDataException {
+        Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+        for (FileReference onDiskPartition : onDiskPartitions) {
             int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (partitionNum == partitionId) {
                 LOGGER.warn("deleting partition {}", partitionNum);
-                FileUtils.deleteQuietly(onDiskPartition);
+                ioManager.delete(onDiskPartition);
                 return;
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index cec1598ca4..c826ed7c97 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -97,6 +97,10 @@ public class FileReference implements Serializable {
         return path + File.separator + name;
     }
 
+    public String getName() {
+        return file.getName();
+    }
+
     public void register() {
         if (registrationTime != 0) {
             throw new IllegalStateException(
@@ -125,7 +129,7 @@ public class FileReference implements Serializable {
         if (parentIndex < 0) {
             return new FileReference(dev, "");
         }
-        String parentPath = path.substring(parentIndex);
+        String parentPath = path.substring(0, parentIndex);
         return new FileReference(dev, parentPath);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 5f93c21a26..07e1c78a5f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -67,6 +67,8 @@ public interface IIOManager extends Closeable {
 
     long getSize(IFileHandle fileHandle);
 
+    long getSize(FileReference fileReference);
+
     WritableByteChannel newWritableChannel(IFileHandle fileHandle);
 
     void deleteWorkspaceFiles() throws HyracksDataException;
@@ -134,11 +136,16 @@ public interface IIOManager extends Closeable {
 
     void deleteDirectory(FileReference root) throws HyracksDataException;
 
+    // TODO: Remove and use list
     Collection<FileReference> getMatchingFiles(FileReference root, 
FilenameFilter filter) throws HyracksDataException;
 
     boolean exists(FileReference fileRef);
 
     void create(FileReference fileRef) throws HyracksDataException;
 
+    boolean makeDirectories(FileReference resourceDir);
+
+    void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
+
     void syncFiles(Set<Integer> activePartitions) throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 6c74838523..8b3fdec1fc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -92,6 +92,14 @@ public class FileHandle implements IFileHandle {
         return raf.getChannel();
     }
 
+    public void setLength(long newLength) throws HyracksDataException {
+        try {
+            raf.setLength(newLength);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     public synchronized void ensureOpen() throws HyracksDataException {
         if (raf == null || !raf.getChannel().isOpen()) {
             try {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 4eebf6b898..24d1061ae3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -400,7 +400,12 @@ public class IOManager implements IIOManager {
 
     @Override
     public long getSize(IFileHandle fileHandle) {
-        return fileHandle.getFileReference().getFile().length();
+        return getSize(fileHandle.getFileReference());
+    }
+
+    @Override
+    public long getSize(FileReference fileReference) {
+        return fileReference.getFile().length();
     }
 
     @Override
@@ -577,7 +582,7 @@ public class IOManager implements IIOManager {
     public Collection<FileReference> getMatchingFiles(FileReference root, 
FilenameFilter filter)
             throws HyracksDataException {
         File rootFile = root.getFile();
-        if (!rootFile.exists()) {
+        if (!rootFile.exists() || !rootFile.isDirectory()) {
             return Collections.emptyList();
         }
 
@@ -600,6 +605,20 @@ public class IOManager implements IIOManager {
         IoUtil.create(fileRef);
     }
 
+    @Override
+    public boolean makeDirectories(FileReference resourceDir) {
+        return resourceDir.getFile().mkdirs();
+    }
+
+    @Override
+    public void cleanDirectory(FileReference resourceDir) throws 
HyracksDataException {
+        try {
+            FileUtils.cleanDirectory(resourceDir.getFile());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     @Override
     public void copyDirectory(FileReference srcFileRef, FileReference 
destFileRef) throws HyracksDataException {
         try {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 91316dc4ef..08eba0df68 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -61,15 +61,17 @@ public class LSMBTreeFileManager extends 
AbstractLSMIndexFileManager {
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws 
HyracksDataException {
         String baseName = getNextComponentSequence(BTREE_FILTER);
-        return new LSMComponentFileReferences(getFileReference(baseName + 
DELIMITER + BTREE_SUFFIX), null,
-                hasBloomFilter ? getFileReference(baseName + DELIMITER + 
BLOOM_FILTER_SUFFIX) : null);
+        return new 
LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER 
+ BTREE_SUFFIX),
+                null,
+                hasBloomFilter ? getCompressedFileReferenceIfAny(baseName + 
DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override
     public LSMComponentFileReferences getRelMergeFileReference(String 
firstFileName, String lastFileName) {
         final String baseName = 
IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
-        return new LSMComponentFileReferences(getFileReference(baseName + 
DELIMITER + BTREE_SUFFIX), null,
-                hasBloomFilter ? getFileReference(baseName + DELIMITER + 
BLOOM_FILTER_SUFFIX) : null);
+        return new 
LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER 
+ BTREE_SUFFIX),
+                null,
+                hasBloomFilter ? getCompressedFileReferenceIfAny(baseName + 
DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 4da207da00..610232f812 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
@@ -32,7 +33,6 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
@@ -139,9 +139,9 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
     protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
             TreeIndexFactory<? extends ITreeIndex> treeFactory, 
ArrayList<IndexComponentFileReference> allFiles,
             IBufferCache bufferCache) throws HyracksDataException {
-        String[] files = listDirFiles(baseDir, filter);
-        for (String fileName : files) {
-            FileReference fileRef = getFileReference(fileName);
+        Set<FileReference> files = ioManager.list(baseDir, filter);
+        for (FileReference filePath : files) {
+            FileReference fileRef = 
getCompressedFileReferenceIfAny(filePath.getName());
             if (treeFactory == null) {
                 allFiles.add(IndexComponentFileReference.of(fileRef));
                 continue;
@@ -155,24 +155,6 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
         }
     }
 
-    static String[] listDirFiles(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
-        /*
-         * Returns null if this abstract pathname does not denote a directory, 
or if an I/O error occurs.
-         */
-        String[] files = dir.getFile().list(filter);
-        if (files == null) {
-            if (!dir.getFile().canRead()) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, 
dir);
-            } else if (!dir.getFile().exists()) {
-                throw 
HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, dir);
-            } else if (!dir.getFile().isDirectory()) {
-                throw 
HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
-            }
-            throw 
HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
-        }
-        return files;
-    }
-
     protected void validateFiles(HashSet<String> groundTruth, 
ArrayList<IndexComponentFileReference> validFiles,
             FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> 
treeFactory, IBufferCache bufferCache)
             throws HyracksDataException {
@@ -189,15 +171,15 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
 
     @Override
     public void createDirs() throws HyracksDataException {
-        if (baseDir.getFile().exists()) {
+        if (ioManager.exists(baseDir)) {
             throw 
HyracksDataException.create(ErrorCode.CANNOT_CREATE_EXISTING_INDEX);
         }
-        baseDir.getFile().mkdirs();
+        ioManager.makeDirectories(baseDir);
     }
 
     @Override
     public void deleteDirs() throws HyracksDataException {
-        IoUtil.delete(baseDir);
+        ioManager.deleteDirectory(baseDir);
     }
 
     @Override
@@ -314,7 +296,7 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
         return 
IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
     }
 
-    protected FileReference getFileReference(String name) {
+    protected FileReference getCompressedFileReferenceIfAny(String name) {
         final ICompressorDecompressor compDecomp = 
compressorDecompressorFactory.createInstance();
         //Avoid creating LAF file for NoOpCompressorDecompressor
         if (compDecomp != NoOpCompressorDecompressor.INSTANCE && 
isCompressible(name)) {
@@ -331,9 +313,9 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
 
     private long getOnDiskLastUsedComponentSequence(FilenameFilter 
filenameFilter) throws HyracksDataException {
         long maxComponentSeq = -1;
-        final String[] files = listDirFiles(baseDir, filenameFilter);
-        for (String fileName : files) {
-            maxComponentSeq = Math.max(maxComponentSeq, 
IndexComponentFileReference.of(fileName).getSequenceEnd());
+        final Set<FileReference> files = ioManager.list(baseDir, 
filenameFilter);
+        for (FileReference file : files) {
+            maxComponentSeq = Math.max(maxComponentSeq, 
IndexComponentFileReference.of(file).getSequenceEnd());
         }
         return maxComponentSeq;
     }

Reply via email to