This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new f406f2a2a4 [ASTERIXDB-3184][STO] Unify I/O APIs
f406f2a2a4 is described below
commit f406f2a2a4659fa5e70c7d112692bb055080d5a3
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue May 16 16:38:07 2023 -0700
[ASTERIXDB-3184][STO] Unify I/O APIs
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Unify file operations by using IIOManager
Change-Id: Ifb77c24ee855537bcf725b2eb1e28b1af200f3ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17524
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../asterix/app/nc/IndexCheckpointManager.java | 62 ++---
.../app/nc/IndexCheckpointManagerProvider.java | 6 +-
.../org/apache/asterix/cloud/CloudIOManager.java | 12 +-
.../ioopcallbacks/LSMIOOperationCallback.java | 12 +-
.../asterix/common/utils/StoragePathUtil.java | 6 +-
.../messaging/CheckpointPartitionIndexesTask.java | 9 +-
.../replication/messaging/ComponentMaskTask.java | 16 +-
.../replication/messaging/DeleteFileTask.java | 9 +-
.../replication/messaging/DropIndexTask.java | 11 +-
.../messaging/MarkComponentValidTask.java | 9 +-
.../replication/messaging/ReplicateFileTask.java | 33 ++-
.../PersistentLocalResourceRepository.java | 255 ++++++++++-----------
.../org/apache/hyracks/api/io/FileReference.java | 6 +-
.../java/org/apache/hyracks/api/io/IIOManager.java | 7 +
.../apache/hyracks/control/nc/io/FileHandle.java | 8 +
.../apache/hyracks/control/nc/io/IOManager.java | 23 +-
.../am/lsm/btree/impls/LSMBTreeFileManager.java | 10 +-
.../common/impls/AbstractLSMIndexFileManager.java | 40 +---
18 files changed, 268 insertions(+), 266 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 290734f046..2ed163812e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -18,15 +18,11 @@
*/
package org.apache.asterix.app.nc;
-import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -34,9 +30,11 @@ import
org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IndexCheckpoint;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -49,10 +47,12 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
private static final FilenameFilter CHECKPOINT_FILE_FILTER =
(file, name) ->
name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
private static final long BULKLOAD_LSN = 0;
- private final Path indexPath;
+ private final FileReference indexPath;
+ private final IIOManager ioManager;
- public IndexCheckpointManager(Path indexPath) {
+ public IndexCheckpointManager(FileReference indexPath, IIOManager
ioManager) {
this.indexPath = indexPath;
+ this.ioManager = ioManager;
}
@Override
@@ -154,7 +154,7 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
}
if (checkpoints.isEmpty()) {
LOGGER.warn("Couldn't find any checkpoint file for index {}.
Content of dir are {}.", indexPath,
- Arrays.toString(indexPath.toFile().listFiles()));
+ ioManager.getMatchingFiles(indexPath,
IoUtil.NO_OP_FILTER).toString());
throw new IllegalStateException("Couldn't find any checkpoints for
resource: " + indexPath);
}
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -180,13 +180,13 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
}
}
- private List<IndexCheckpoint> getCheckpoints() throws
ClosedByInterruptException {
+ private List<IndexCheckpoint> getCheckpoints() throws
ClosedByInterruptException, HyracksDataException {
List<IndexCheckpoint> checkpoints = new ArrayList<>();
- final File[] checkpointFiles =
indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
- if (checkpointFiles != null) {
- for (File checkpointFile : checkpointFiles) {
+ final Collection<FileReference> checkpointFiles =
ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+ if (!checkpointFiles.isEmpty()) {
+ for (FileReference checkpointFile : checkpointFiles) {
try {
- checkpoints.add(read(checkpointFile.toPath()));
+ checkpoints.add(read(checkpointFile));
} catch (ClosedByInterruptException e) {
throw e;
} catch (IOException e) {
@@ -198,14 +198,14 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
}
private void persist(IndexCheckpoint checkpoint) throws
HyracksDataException {
- final Path checkpointPath = getCheckpointPath(checkpoint);
+ final FileReference checkpointPath = getCheckpointPath(checkpoint);
for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
try {
// clean up from previous write failure
- if (checkpointPath.toFile().exists()) {
- Files.delete(checkpointPath);
+ if (ioManager.exists(checkpointPath)) {
+ ioManager.delete(checkpointPath);
}
- FileUtil.writeAndForce(checkpointPath,
checkpoint.asJson().getBytes());
+ ioManager.overwrite(checkpointPath,
checkpoint.asJson().getBytes());
// ensure it was written correctly by reading it
read(checkpointPath);
return;
@@ -223,17 +223,18 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
}
}
- private IndexCheckpoint read(Path checkpointPath) throws IOException {
- return IndexCheckpoint.fromJson(new
String(Files.readAllBytes(checkpointPath)));
+ private IndexCheckpoint read(FileReference checkpointPath) throws
IOException {
+ return IndexCheckpoint.fromJson(new
String(ioManager.readAllBytes(checkpointPath)));
}
private void deleteHistory(long latestId, int historyToKeep) {
try {
- final File[] checkpointFiles =
indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
- if (checkpointFiles != null) {
- for (File checkpointFile : checkpointFiles) {
- if (getCheckpointIdFromFileName(checkpointFile.toPath()) <
(latestId - historyToKeep)) {
- Files.delete(checkpointFile.toPath());
+ final Collection<FileReference> checkpointFiles =
+ ioManager.getMatchingFiles(indexPath,
CHECKPOINT_FILE_FILTER);
+ if (!checkpointFiles.isEmpty()) {
+ for (FileReference checkpointFile : checkpointFiles) {
+ if (getCheckpointIdFromFileName(checkpointFile) <
(latestId - historyToKeep)) {
+ ioManager.delete(checkpointFile);
}
}
}
@@ -242,13 +243,12 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
}
}
- private Path getCheckpointPath(IndexCheckpoint checkpoint) {
- return Paths.get(indexPath.toString(),
- StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX +
String.valueOf(checkpoint.getId()));
+ private FileReference getCheckpointPath(IndexCheckpoint checkpoint) {
+ return
indexPath.getChild(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX +
checkpoint.getId());
}
- private long getCheckpointIdFromFileName(Path checkpointPath) {
- return Long.valueOf(checkpointPath.getFileName().toString()
-
.substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+ private long getCheckpointIdFromFileName(FileReference checkpointPath) {
+ return Long
+
.parseLong(checkpointPath.getName().substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
index e0b3105a80..1e08ed87d4 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.app.nc;
-import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
@@ -27,6 +26,7 @@ import
org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
public class IndexCheckpointManagerProvider implements
IIndexCheckpointManagerProvider {
@@ -54,8 +54,8 @@ public class IndexCheckpointManagerProvider implements
IIndexCheckpointManagerPr
private IndexCheckpointManager create(ResourceReference ref) {
try {
- final Path indexPath = StoragePathUtil.getIndexPath(ioManager,
ref);
- return new IndexCheckpointManager(indexPath);
+ final FileReference indexPath =
StoragePathUtil.getIndexPath(ioManager, ref);
+ return new IndexCheckpointManager(indexPath, ioManager);
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index 6f4ce696ed..97d8d80889 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -18,7 +18,8 @@
*/
package org.apache.asterix.cloud;
-import static org.apache.asterix.common.utils.StorageConstants.*;
+import static
org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
import java.io.File;
import java.io.FilenameFilter;
@@ -151,6 +152,7 @@ public class CloudIOManager extends IOManager {
super.close(fHandle);
}
+ // TODO This method should not do any syncing. It simply should list the
files
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
Set<String> cloudFiles = cloudClient.listObjects(bucket,
dir.getRelativePath(), filter);
@@ -222,6 +224,14 @@ public class CloudIOManager extends IOManager {
return super.getSize(fileHandle);
}
+ @Override
+ public long getSize(FileReference fileReference) {
+ if (!fileReference.getFile().exists()) {
+ return cloudClient.getObjectSize(bucket,
fileReference.getRelativePath());
+ }
+ return super.getSize(fileReference);
+ }
+
@Override
public void overwrite(FileReference fileRef, byte[] bytes) throws
ClosedByInterruptException, HyracksDataException {
super.overwrite(fileRef, bytes);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51960..999636da13 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -21,8 +21,6 @@ package org.apache.asterix.common.ioopcallbacks;
import static
org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
@@ -77,7 +75,7 @@ public class LSMIOOperationCallback implements
ILSMIOOperationCallback {
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
- private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
+ private final Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex,
ILSMComponentId componentId,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
@@ -307,10 +305,8 @@ public class LSMIOOperationCallback implements
ILSMIOOperationCallback {
private static FileReference getOperationMaskFilePath(ILSMIOOperation
operation) {
FileReference target = operation.getTarget();
- final String componentSequence =
getComponentSequence(target.getFile().getAbsolutePath());
- Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
- Path maskFileRelPath =
- Paths.get(idxRelPath.toString(),
StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
- return new FileReference(target.getDeviceHandle(),
maskFileRelPath.toString());
+ String componentSequence =
getComponentSequence(target.getFile().getAbsolutePath());
+ FileReference idxRelPath = target.getParent();
+ return idxRelPath.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX
+ componentSequence);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index a9ed066009..9702b18491 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,7 +19,6 @@
package org.apache.asterix.common.utils;
import java.io.File;
-import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
@@ -31,6 +30,7 @@ import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConst
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.MappedFileSplit;
@@ -180,7 +180,7 @@ public class StoragePathUtil {
* @return
* @throws HyracksDataException
*/
- public static Path getIndexPath(IIOManager ioManager, ResourceReference
ref) throws HyracksDataException {
- return
ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+ public static FileReference getIndexPath(IIOManager ioManager,
ResourceReference ref) throws HyracksDataException {
+ return ioManager.resolve(ref.getRelativePath().toString());
}
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 97b6556040..a6ba0e2916 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -24,7 +24,6 @@ import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Path;
import java.util.Collection;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -36,6 +35,7 @@ import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
@@ -69,14 +69,15 @@ public class CheckpointPartitionIndexesTask implements
IReplicaTask {
DatasetResourceReference ref = DatasetResourceReference.of(ls);
final IIndexCheckpointManager indexCheckpointManager =
indexCheckpointManagerProvider.get(ref);
// Get most recent sequence of existing files to avoid deletion
- Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
- String[] files =
indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ FileReference indexPath = StoragePathUtil.getIndexPath(ioManager,
ref);
+ Collection<FileReference> files =
+ ioManager.getMatchingFiles(indexPath,
AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
if (files == null) {
throw HyracksDataException
.create(new IOException(indexPath + " is not a
directory or an IO Error occurred"));
}
long maxComponentSequence = UNINITIALIZED_COMPONENT_SEQ;
- for (String file : files) {
+ for (FileReference file : files) {
maxComponentSequence =
Math.max(maxComponentSequence,
IndexComponentFileReference.of(file).getSequenceEnd());
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 3f04bd2c5c..e9af85c204 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -22,9 +22,6 @@ import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -50,21 +47,22 @@ public class ComponentMaskTask implements IReplicaTask {
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker
worker) {
try {
+ IIOManager ioManager = appCtx.getIoManager();
// create mask
- final Path maskPath = getComponentMaskPath(appCtx, file);
- Files.createFile(maskPath);
+ final FileReference maskPath = getComponentMaskPath(ioManager,
file);
+ ioManager.create(maskPath);
ReplicationProtocol.sendAck(worker.getChannel(),
worker.getReusableBuffer());
} catch (IOException e) {
throw new ReplicationException(e);
}
}
- public static Path getComponentMaskPath(INcApplicationContext appCtx,
String componentFile) throws IOException {
- final IIOManager ioManager = appCtx.getIoManager();
+ public static FileReference getComponentMaskPath(IIOManager ioManager,
String componentFile) throws IOException {
final FileReference localPath = ioManager.resolve(componentFile);
- final Path resourceDir =
Files.createDirectories(localPath.getFile().getParentFile().toPath());
+ final FileReference resourceDir = localPath.getParent();
+ ioManager.makeDirectories(resourceDir);
final String componentSequence =
ResourceReference.getComponentSequence(componentFile);
- return Paths.get(resourceDir.toString(),
StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+ return
resourceDir.getChild(StorageConstants.COMPONENT_MASK_FILE_PREFIX +
componentSequence);
}
@Override
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 92e4989b40..a00acfbdea 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -20,10 +20,8 @@ package org.apache.asterix.replication.messaging;
import java.io.DataInput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Files;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -32,6 +30,7 @@ import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,9 +51,9 @@ public class DeleteFileTask implements IReplicaTask {
public void perform(INcApplicationContext appCtx, IReplicationWorker
worker) {
try {
final IIOManager ioManager = appCtx.getIoManager();
- final File localFile = ioManager.resolve(file).getFile();
- if (localFile.exists()) {
- Files.delete(localFile.toPath());
+ final FileReference localFile = ioManager.resolve(file);
+ if (ioManager.exists(localFile)) {
+ ioManager.delete(localFile);
ResourceReference replicaRes =
ResourceReference.of(localFile.getAbsolutePath());
if (replicaRes.isMetadataResource()) {
((PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository())
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index 483561b777..2b0e2d8cf5 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -20,7 +20,6 @@ package org.apache.asterix.replication.messaging;
import java.io.DataInput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -31,8 +30,8 @@ import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,10 +51,10 @@ public class DropIndexTask implements IReplicaTask {
public void perform(INcApplicationContext appCtx, IReplicationWorker
worker) {
try {
final IIOManager ioManager = appCtx.getIoManager();
- final File indexFile = ioManager.resolve(file).getFile();
- if (indexFile.exists()) {
- File indexDir = indexFile.getParentFile();
- IoUtil.delete(indexDir);
+ final FileReference indexFile = ioManager.resolve(file);
+ if (ioManager.exists(indexFile)) {
+ FileReference indexDir = indexFile.getParent();
+ ioManager.deleteDirectory(indexDir);
((PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository())
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
LOGGER.info(() -> "Deleted index: " +
indexFile.getAbsolutePath());
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index fa77378619..76fde09f7b 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -22,8 +22,6 @@ import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -36,6 +34,8 @@ import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.LogManager;
@@ -67,9 +67,10 @@ public class MarkComponentValidTask implements IReplicaTask {
} else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
ensureComponentLsnFlushed(appCtx);
}
+ IIOManager ioManager = appCtx.getIoManager();
// delete mask
- final Path maskPath =
ComponentMaskTask.getComponentMaskPath(appCtx, file);
- Files.delete(maskPath);
+ final FileReference maskPath =
ComponentMaskTask.getComponentMaskPath(ioManager, file);
+ ioManager.delete(maskPath);
ReplicationProtocol.sendAck(worker.getChannel(),
worker.getReusableBuffer());
} catch (IOException | InterruptedException e) {
throw new ReplicationException(e);
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 500a5def24..71ed63ef9d 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -24,11 +24,7 @@ import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -40,10 +36,10 @@ import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.replication.management.NetworkingUtil;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -73,31 +69,34 @@ public class ReplicateFileTask implements IReplicaTask {
final IIOManager ioManager = appCtx.getIoManager();
// resolve path
final FileReference localPath = ioManager.resolve(file);
- final Path resourceDir =
Files.createDirectories(localPath.getFile().getParentFile().toPath());
+ FileReference resourceDir = localPath.getParent();
+ ioManager.makeDirectories(resourceDir);
if (indexMetadata) {
// ensure clean index directory
- FileUtils.cleanDirectory(resourceDir.toFile());
+ ioManager.cleanDirectory(resourceDir);
((PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository())
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
}
// create mask
- final Path maskPath = Paths.get(resourceDir.toString(),
- StorageConstants.MASK_FILE_PREFIX +
localPath.getFile().getName());
- Files.createFile(maskPath);
+ final FileReference maskPath =
+ resourceDir.getChild(StorageConstants.MASK_FILE_PREFIX +
localPath.getName());
+ ioManager.create(maskPath);
// receive actual file
- final Path filePath = Paths.get(resourceDir.toString(),
localPath.getFile().getName());
- Files.createFile(filePath);
- try (RandomAccessFile fileOutputStream = new
RandomAccessFile(filePath.toFile(), "rw");
- FileChannel fileChannel = fileOutputStream.getChannel()) {
- fileOutputStream.setLength(size);
+ ioManager.create(localPath);
+ FileHandle fileHandle = (FileHandle) ioManager.open(localPath,
IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try (FileChannel fileChannel = fileHandle.getFileChannel()) {
+ fileHandle.setLength(size);
NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
- fileChannel.force(true);
+ ioManager.sync(fileHandle, true);
+ } finally {
+ ioManager.close(fileHandle);
}
if (indexMetadata) {
initIndexCheckpoint(appCtx);
}
//delete mask
- Files.delete(maskPath);
+ ioManager.delete(maskPath);
LOGGER.debug("received file {} from master", localPath);
ReplicationProtocol.sendAck(worker.getChannel(),
worker.getReusableBuffer());
} catch (IOException e) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1e813462b3..8ef55e89f2 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.resource;
import static
org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
import static
org.apache.asterix.common.utils.StorageConstants.INDEX_NON_DATA_FILES_PREFIX;
import static
org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
+import static
org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
import static
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
import static
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
@@ -28,12 +29,8 @@ import static
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFil
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.text.ParseException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -44,7 +41,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -59,7 +55,6 @@ import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.storage.ResourceStorageStats;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -68,14 +63,12 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -88,18 +81,20 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final String METADATA_FILE_MASK_NAME =
StorageConstants.MASK_FILE_PREFIX +
StorageConstants.METADATA_FILE_NAME;
private static final FilenameFilter LSM_INDEX_FILES_FILTER =
(dir, name) -> name.startsWith(METADATA_FILE_NAME) ||
!name.startsWith(INDEX_NON_DATA_FILES_PREFIX);
private static final FilenameFilter MASK_FILES_FILTER =
(dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
- private static final int MAX_CACHED_RESOURCES = 1000;
private static final FilenameFilter METADATA_FILES_FILTER =
(dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
private static final FilenameFilter METADATA_MASK_FILES_FILTER =
(dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
+ private static final int MAX_CACHED_RESOURCES = 1000;
+
// Finals
private final IIOManager ioManager;
private final Cache<String, LocalResource> resourceCache;
@@ -107,7 +102,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
private IReplicationManager replicationManager;
- private final List<Path> storageRoots;
+ private final List<FileReference> storageRoots;
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
@@ -120,8 +115,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
storageRoots = new ArrayList<>();
final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
for (int i = 0; i < ioDevices.size(); i++) {
- storageRoots.add(
- Paths.get(ioDevices.get(i).getMount().getAbsolutePath(),
StorageConstants.STORAGE_ROOT_DIR_NAME));
+ storageRoots.add(new FileReference(ioDevices.get(i),
STORAGE_ROOT_DIR_NAME));
}
createStorageRoots();
resourceCache =
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -131,7 +125,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public String toString() {
StringBuilder aString = new
StringBuilder().append(PersistentLocalResourceRepository.class.getSimpleName())
.append(Character.LINE_SEPARATOR).append(ioManager.getClass().getSimpleName()).append(':')
-
.append(Character.LINE_SEPARATOR).append(ioManager.toString()).append(Character.LINE_SEPARATOR)
+
.append(Character.LINE_SEPARATOR).append(ioManager).append(Character.LINE_SEPARATOR)
.append("Cached Resources:").append(Character.LINE_SEPARATOR);
resourceCache.asMap().forEach(
(key, value) ->
aString.append(key).append("->").append(value).append(Character.LINE_SEPARATOR));
@@ -143,8 +137,8 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
LocalResource resource = resourceCache.getIfPresent(relativePath);
if (resource == null) {
FileReference resourceFile = getLocalResourceFileByName(ioManager,
relativePath);
- if (resourceFile.getFile().exists()) {
- resource = readLocalResource(resourceFile.getFile());
+ resource = readLocalResource(resourceFile);
+ if (resource != null) {
resourceCache.put(relativePath, resource);
}
}
@@ -162,15 +156,15 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
throw new HyracksDataException("Duplicate resource: " +
resourceFile.getAbsolutePath());
}
- final File parent = resourceFile.getFile().getParentFile();
- if (!parent.exists() && !parent.mkdirs()) {
+ final FileReference parent = resourceFile.getParent();
+ if (!ioManager.exists(parent) &&
!ioManager.makeDirectories(parent)) {
throw HyracksDataException.create(CANNOT_CREATE_FILE,
parent.getAbsolutePath());
}
// The next block should be all or nothing
try {
createResourceFileMask(resourceFile);
byte[] bytes =
OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
-
FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
+ ioManager.overwrite(resourceFile, bytes);
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(
UNINITIALIZED_COMPONENT_SEQ, 0,
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
deleteResourceFileMask(resourceFile);
@@ -199,7 +193,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private void cleanup(FileReference resourceFile) {
if (resourceFile.getFile().exists()) {
try {
- IoUtil.delete(resourceFile);
+ ioManager.delete(resourceFile);
} catch (Throwable th) {
LOGGER.error("Error cleaning up corrupted resource {}",
resourceFile, th);
ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
@@ -210,7 +204,9 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
@Override
public void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager,
relativePath);
- boolean resourceExists = resourceFile.getFile().exists();
+ final LocalResource localResource = readLocalResource(resourceFile);
+
+ boolean resourceExists = localResource != null;
if (isReplicationEnabled && resourceExists) {
try {
createReplicationJob(ReplicationOperation.DELETE,
resourceFile);
@@ -221,8 +217,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
synchronized (this) {
try {
if (resourceExists) {
- final LocalResource localResource =
readLocalResource(resourceFile.getFile());
- IoUtil.delete(resourceFile);
+ ioManager.delete(resourceFile);
// delete all checkpoints
indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
} else {
@@ -241,18 +236,15 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return ioManager.resolve(fileName);
}
- public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter, List<Path> roots)
- throws HyracksDataException {
+ public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter,
+ List<FileReference> roots) throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
- for (Path root : roots) {
- if (!Files.exists(root) || !Files.isDirectory(root)) {
- continue;
- }
- final Collection<File> files = IoUtil.getMatchingFiles(root,
METADATA_FILES_FILTER);
+ for (FileReference root : roots) {
+ final Collection<FileReference> files =
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
try {
- for (File file : files) {
+ for (FileReference file : files) {
final LocalResource localResource =
readLocalResource(file);
- if (filter.test(localResource)) {
+ if (localResource != null && filter.test(localResource)) {
resourcesMap.put(localResource.getId(), localResource);
}
}
@@ -270,7 +262,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
throws HyracksDataException {
- List<Path> partitionsRoots = new ArrayList<>();
+ List<FileReference> partitionsRoots = new ArrayList<>();
for (Integer partition : partitions) {
partitionsRoots.add(getPartitionRoot(partition));
}
@@ -278,14 +270,15 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
public synchronized void deleteInvalidIndexes(Predicate<LocalResource>
filter) throws HyracksDataException {
- for (Path root : storageRoots) {
- final Collection<File> files = IoUtil.getMatchingFiles(root,
METADATA_FILES_FILTER);
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> files =
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
try {
- for (File file : files) {
+ for (FileReference file : files) {
final LocalResource localResource =
readLocalResource(file);
- if (filter.test(localResource)) {
- LOGGER.warn("deleting invalid metadata index {}",
file.getParentFile());
- IoUtil.delete(file.getParentFile());
+ if (localResource != null && filter.test(localResource)) {
+ FileReference parent = file.getParent();
+ LOGGER.warn("deleting invalid metadata index {}",
parent);
+ ioManager.delete(parent);
}
}
} catch (IOException e) {
@@ -319,10 +312,14 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
: (path + File.separator +
StorageConstants.METADATA_FILE_NAME);
}
- private LocalResource readLocalResource(File file) throws
HyracksDataException {
- final Path path = Paths.get(file.getAbsolutePath());
+ private LocalResource readLocalResource(FileReference fileRef) throws
HyracksDataException {
+ byte[] bytes = ioManager.readAllBytes(fileRef);
+ if (bytes == null) {
+ return null;
+ }
+
try {
- final JsonNode jsonNode =
OBJECT_MAPPER.readValue(Files.readAllBytes(path), JsonNode.class);
+ final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes,
JsonNode.class);
LocalResource resource = (LocalResource)
persistedResourceRegistry.deserialize(jsonNode);
if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
return resource;
@@ -358,15 +355,10 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
/**
* Deletes physical files of all data verses.
- *
- * @throws IOException
*/
- public synchronized void deleteStorageData() throws IOException {
- for (Path root : storageRoots) {
- final File rootFile = root.toFile();
- if (rootFile.exists()) {
- FileUtils.deleteDirectory(rootFile);
- }
+ public synchronized void deleteStorageData() throws HyracksDataException {
+ for (FileReference root : storageRoots) {
+ ioManager.deleteDirectory(root);
}
createStorageRoots();
}
@@ -392,15 +384,15 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
* @return The set of indexes files
* @throws HyracksDataException
*/
- public synchronized Set<File> getPartitionIndexes(int partition) throws
HyracksDataException {
- Path partitionRoot = getPartitionRoot(partition);
+ public synchronized Set<FileReference> getPartitionIndexes(int partition)
throws HyracksDataException {
+ FileReference partitionRoot = getPartitionRoot(partition);
final Map<Long, LocalResource> partitionResourcesMap =
getResources(resource -> {
DatasetLocalResource dsResource = (DatasetLocalResource)
resource.getResource();
return dsResource.getPartition() == partition;
}, Collections.singletonList(partitionRoot));
- Set<File> indexes = new HashSet<>();
+ Set<FileReference> indexes = new HashSet<>();
for (LocalResource localResource : partitionResourcesMap.values()) {
- indexes.add(ioManager.resolve(localResource.getPath()).getFile());
+ indexes.add(ioManager.resolve(localResource.getPath()));
}
return indexes;
}
@@ -426,15 +418,15 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public synchronized List<String> getPartitionReplicatedFiles(int
partition, IReplicationStrategy strategy)
throws HyracksDataException {
final List<String> partitionReplicatedFiles = new ArrayList<>();
- final Set<File> replicatedIndexes = new HashSet<>();
+ final Set<FileReference> replicatedIndexes = new HashSet<>();
final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
for (LocalResource lr : partitionResources.values()) {
DatasetLocalResource datasetLocalResource = (DatasetLocalResource)
lr.getResource();
if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
-
replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
+ replicatedIndexes.add(ioManager.resolve(lr.getPath()));
}
}
- for (File indexDir : replicatedIndexes) {
+ for (FileReference indexDir : replicatedIndexes) {
partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
}
return partitionReplicatedFiles;
@@ -455,31 +447,23 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return maxComponentId;
}
- private List<String> getIndexFiles(File indexDir) {
+ private List<String> getIndexFiles(FileReference indexDir) throws
HyracksDataException {
final List<String> indexFiles = new ArrayList<>();
- if (indexDir.isDirectory()) {
- File[] indexFilteredFiles =
indexDir.listFiles(LSM_INDEX_FILES_FILTER);
- if (indexFilteredFiles != null) {
-
Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
- }
- }
+ Collection<FileReference> indexFilteredFiles =
ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+
indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
return indexFiles;
}
private void createStorageRoots() {
- for (Path root : storageRoots) {
- try {
- Files.createDirectories(root);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to create storage root
directory at " + root, e);
- }
+ for (FileReference root : storageRoots) {
+ ioManager.makeDirectories(root);
}
}
public synchronized void cleanup(int partition) throws
HyracksDataException {
- final Set<File> partitionIndexes = getPartitionIndexes(partition);
+ final Set<FileReference> partitionIndexes =
getPartitionIndexes(partition);
try {
- for (File index : partitionIndexes) {
+ for (FileReference index : partitionIndexes) {
deleteIndexMaskedFiles(index);
if (isValidIndex(index)) {
deleteIndexInvalidComponents(index);
@@ -504,30 +488,27 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
public synchronized void deleteCorruptedResources() throws
HyracksDataException {
- for (Path root : storageRoots) {
- final Collection<File> metadataMaskFiles =
IoUtil.getMatchingFiles(root, METADATA_MASK_FILES_FILTER);
- for (File metadataMaskFile : metadataMaskFiles) {
- final File resourceFile = new
File(metadataMaskFile.getParent(), METADATA_FILE_NAME);
- if (resourceFile.exists()) {
- IoUtil.delete(resourceFile);
- }
- IoUtil.delete(metadataMaskFile);
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> metadataMaskFiles =
+ ioManager.getMatchingFiles(root,
METADATA_MASK_FILES_FILTER);
+ for (FileReference metadataMaskFile : metadataMaskFiles) {
+ final FileReference resourceFile =
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+ ioManager.delete(resourceFile);
+ ioManager.delete(metadataMaskFile);
}
}
}
- private void deleteIndexMaskedFiles(File index) throws IOException {
- File[] masks = index.listFiles(MASK_FILES_FILTER);
- if (masks != null) {
- for (File mask : masks) {
- deleteIndexMaskedFiles(index, mask);
- // delete the mask itself
- Files.delete(mask.toPath());
- }
+ private void deleteIndexMaskedFiles(FileReference index) throws
IOException {
+ Collection<FileReference> masks = ioManager.getMatchingFiles(index,
MASK_FILES_FILTER);
+ for (FileReference mask : masks) {
+ deleteIndexMaskedFiles(index, mask);
+ // delete the mask itself
+ ioManager.delete(mask);
}
}
- private boolean isValidIndex(File index) throws IOException {
+ private boolean isValidIndex(FileReference index) throws IOException {
// any index without any checkpoint files is invalid
// this can happen if a crash happens when the index metadata file is
created
// but before the initial checkpoint is persisted. The index metadata
file will
@@ -535,46 +516,46 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return getIndexCheckpointManager(index).getCheckpointCount() != 0;
}
- private void deleteIndexInvalidComponents(File index) throws IOException,
ParseException {
- final File[] indexComponentFiles =
index.listFiles(COMPONENT_FILES_FILTER);
+ private void deleteIndexInvalidComponents(FileReference index) throws
IOException, ParseException {
+ final Collection<FileReference> indexComponentFiles =
ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error
occurred");
}
final long validComponentSequence =
getIndexCheckpointManager(index).getValidComponentSequence();
- for (File componentFile : indexComponentFiles) {
+ for (FileReference componentFileRef : indexComponentFiles) {
// delete any file with start or end sequence > valid component
sequence
- final long fileStart =
IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
- final long fileEnd =
IndexComponentFileReference.of(componentFile.getName()).getSequenceEnd();
+ final long fileStart =
IndexComponentFileReference.of(componentFileRef.getName()).getSequenceStart();
+ final long fileEnd =
IndexComponentFileReference.of(componentFileRef.getName()).getSequenceEnd();
if (fileStart > validComponentSequence || fileEnd >
validComponentSequence) {
- LOGGER.warn(() -> "Deleting invalid component file " +
componentFile.getAbsolutePath()
+ LOGGER.warn(() -> "Deleting invalid component file " +
componentFileRef.getAbsolutePath()
+ " based on valid sequence " +
validComponentSequence);
- Files.delete(componentFile.toPath());
+ ioManager.delete(componentFileRef);
}
}
}
- private IIndexCheckpointManager getIndexCheckpointManager(File index)
throws HyracksDataException {
- final String indexFile = Paths.get(index.getAbsolutePath(),
StorageConstants.METADATA_FILE_NAME).toString();
+ private IIndexCheckpointManager getIndexCheckpointManager(FileReference
index) throws HyracksDataException {
+ final String indexFile =
index.getChild(METADATA_FILE_NAME).getAbsolutePath();
final ResourceReference indexRef = ResourceReference.of(indexFile);
return indexCheckpointManagerProvider.get(indexRef);
}
- private void deleteIndexMaskedFiles(File index, File mask) throws
IOException {
- if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
+ private void deleteIndexMaskedFiles(FileReference index, FileReference
mask) throws IOException {
+ if
(!mask.getFile().getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
throw new IllegalArgumentException("Unrecognized mask file: " +
mask);
}
- File[] maskedFiles;
+ Collection<FileReference> maskedFiles;
if (isComponentMask(mask)) {
final String componentId =
mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
- maskedFiles = index.listFiles((dir, name) ->
name.startsWith(componentId));
+ maskedFiles = ioManager.getMatchingFiles(index, (dir, name) ->
name.startsWith(componentId));
} else {
final String maskedFileName =
mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
- maskedFiles = index.listFiles((dir, name) ->
name.equals(maskedFileName));
+ maskedFiles = ioManager.getMatchingFiles(index, (dir, name) ->
name.equals(maskedFileName));
}
if (maskedFiles != null) {
- for (File maskedFile : maskedFiles) {
+ for (FileReference maskedFile : maskedFiles) {
LOGGER.info(() -> "deleting masked file: " +
maskedFile.getAbsolutePath());
- Files.delete(maskedFile.toPath());
+ ioManager.delete(maskedFile);
}
}
}
@@ -583,13 +564,13 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
try {
final FileReference resolvedPath =
ioManager.resolve(resource.getRelativePath().toString());
long totalSize = 0;
- final File[] indexFiles = resolvedPath.getFile().listFiles();
+ final Collection<FileReference> indexFiles =
ioManager.list(resolvedPath);
final Map<String, Long> componentsStats = new HashMap<>();
if (indexFiles != null) {
- for (File file : indexFiles) {
- long fileSize = file.length();
+ for (FileReference file : indexFiles) {
+ long fileSize = ioManager.getSize(file);
totalSize += fileSize;
- if (isComponentFile(resolvedPath.getFile(),
file.getName())) {
+ if (isComponentFile(resolvedPath, file.getName())) {
String componentSeq =
getComponentSequence(file.getAbsolutePath());
componentsStats.put(componentSeq,
componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
}
@@ -621,73 +602,69 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
private void createResourceFileMask(FileReference resourceFile) throws
HyracksDataException {
- Path maskFile = getResourceMaskFilePath(resourceFile);
+ FileReference maskFile = getResourceMaskFilePath(resourceFile);
try {
- Files.createFile(maskFile);
+ ioManager.create(maskFile);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
private void deleteResourceFileMask(FileReference resourceFile) throws
HyracksDataException {
- Path maskFile = getResourceMaskFilePath(resourceFile);
- IoUtil.delete(maskFile);
+ FileReference maskFile = getResourceMaskFilePath(resourceFile);
+ ioManager.delete(maskFile);
}
- private Path getResourceMaskFilePath(FileReference resourceFile) {
- return
Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(),
METADATA_FILE_MASK_NAME);
+ private FileReference getResourceMaskFilePath(FileReference resourceFile) {
+ FileReference resourceFileParent = resourceFile.getParent();
+ return resourceFileParent.getChild(METADATA_FILE_MASK_NAME);
}
- private static boolean isComponentMask(File mask) {
+ private static boolean isComponentMask(FileReference mask) {
return
mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
}
- private static boolean isComponentFile(File indexDir, String fileName) {
- return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
- }
-
- public List<Path> getStorageRoots() {
- return storageRoots;
+ private static boolean isComponentFile(FileReference indexDir, String
fileName) {
+ return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
}
- public synchronized void keepPartitions(Set<Integer> keepPartitions) {
- List<File> onDiskPartitions = getOnDiskPartitions();
- for (File onDiskPartition : onDiskPartitions) {
+ public synchronized void keepPartitions(Set<Integer> keepPartitions)
throws HyracksDataException {
+ List<FileReference> onDiskPartitions = getOnDiskPartitions();
+ for (FileReference onDiskPartition : onDiskPartitions) {
int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
if (!keepPartitions.contains(partitionNum)) {
LOGGER.warn("deleting partition {} since it is not on
partitions to keep {}", partitionNum,
keepPartitions);
- FileUtils.deleteQuietly(onDiskPartition);
+ ioManager.delete(onDiskPartition);
}
}
}
- public synchronized List<File> getOnDiskPartitions() {
- List<File> onDiskPartitions = new ArrayList<>();
- for (Path root : storageRoots) {
- File[] partitions = root.toFile().listFiles(
+ public synchronized List<FileReference> getOnDiskPartitions() throws
HyracksDataException {
+ List<FileReference> onDiskPartitions = new ArrayList<>();
+ for (FileReference root : storageRoots) {
+ Collection<FileReference> partitions = ioManager.list(root,
(dir, name) -> dir.isDirectory() &&
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
if (partitions != null) {
- onDiskPartitions.addAll(Arrays.asList(partitions));
+ onDiskPartitions.addAll(partitions);
}
}
return onDiskPartitions;
}
- public Path getPartitionRoot(int partition) throws HyracksDataException {
- Path path =
- Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME,
StorageConstants.PARTITION_DIR_PREFIX + partition);
- FileReference resolve = ioManager.resolve(path.toString());
- return resolve.getFile().toPath();
+ public FileReference getPartitionRoot(int partition) throws
HyracksDataException {
+ String path = StorageConstants.STORAGE_ROOT_DIR_NAME + File.separator
+ StorageConstants.PARTITION_DIR_PREFIX
+ + partition;
+ return ioManager.resolve(path);
}
- public void deletePartition(int partitionId) {
- List<File> onDiskPartitions = getOnDiskPartitions();
- for (File onDiskPartition : onDiskPartitions) {
+ public void deletePartition(int partitionId) throws HyracksDataException {
+ Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+ for (FileReference onDiskPartition : onDiskPartitions) {
int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
if (partitionNum == partitionId) {
LOGGER.warn("deleting partition {}", partitionNum);
- FileUtils.deleteQuietly(onDiskPartition);
+ ioManager.delete(onDiskPartition);
return;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index cec1598ca4..c826ed7c97 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -97,6 +97,10 @@ public class FileReference implements Serializable {
return path + File.separator + name;
}
+ public String getName() {
+ return file.getName();
+ }
+
public void register() {
if (registrationTime != 0) {
throw new IllegalStateException(
@@ -125,7 +129,7 @@ public class FileReference implements Serializable {
if (parentIndex < 0) {
return new FileReference(dev, "");
}
- String parentPath = path.substring(parentIndex);
+ String parentPath = path.substring(0, parentIndex);
return new FileReference(dev, parentPath);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 5f93c21a26..07e1c78a5f 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -67,6 +67,8 @@ public interface IIOManager extends Closeable {
long getSize(IFileHandle fileHandle);
+ long getSize(FileReference fileReference);
+
WritableByteChannel newWritableChannel(IFileHandle fileHandle);
void deleteWorkspaceFiles() throws HyracksDataException;
@@ -134,11 +136,16 @@ public interface IIOManager extends Closeable {
void deleteDirectory(FileReference root) throws HyracksDataException;
+ // TODO: Remove and use list
Collection<FileReference> getMatchingFiles(FileReference root,
FilenameFilter filter) throws HyracksDataException;
boolean exists(FileReference fileRef);
void create(FileReference fileRef) throws HyracksDataException;
+ boolean makeDirectories(FileReference resourceDir);
+
+ void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
+
void syncFiles(Set<Integer> activePartitions) throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 6c74838523..8b3fdec1fc 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -92,6 +92,14 @@ public class FileHandle implements IFileHandle {
return raf.getChannel();
}
+ public void setLength(long newLength) throws HyracksDataException {
+ try {
+ raf.setLength(newLength);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
public synchronized void ensureOpen() throws HyracksDataException {
if (raf == null || !raf.getChannel().isOpen()) {
try {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 4eebf6b898..24d1061ae3 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -400,7 +400,12 @@ public class IOManager implements IIOManager {
@Override
public long getSize(IFileHandle fileHandle) {
- return fileHandle.getFileReference().getFile().length();
+ return getSize(fileHandle.getFileReference());
+ }
+
+ @Override
+ public long getSize(FileReference fileReference) {
+ return fileReference.getFile().length();
}
@Override
@@ -577,7 +582,7 @@ public class IOManager implements IIOManager {
public Collection<FileReference> getMatchingFiles(FileReference root,
FilenameFilter filter)
throws HyracksDataException {
File rootFile = root.getFile();
- if (!rootFile.exists()) {
+ if (!rootFile.exists() || !rootFile.isDirectory()) {
return Collections.emptyList();
}
@@ -600,6 +605,20 @@ public class IOManager implements IIOManager {
IoUtil.create(fileRef);
}
+ @Override
+ public boolean makeDirectories(FileReference resourceDir) {
+ return resourceDir.getFile().mkdirs();
+ }
+
+ @Override
+ public void cleanDirectory(FileReference resourceDir) throws
HyracksDataException {
+ try {
+ FileUtils.cleanDirectory(resourceDir.getFile());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
@Override
public void copyDirectory(FileReference srcFileRef, FileReference
destFileRef) throws HyracksDataException {
try {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 91316dc4ef..08eba0df68 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -61,15 +61,17 @@ public class LSMBTreeFileManager extends
AbstractLSMIndexFileManager {
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws
HyracksDataException {
String baseName = getNextComponentSequence(BTREE_FILTER);
- return new LSMComponentFileReferences(getFileReference(baseName +
DELIMITER + BTREE_SUFFIX), null,
- hasBloomFilter ? getFileReference(baseName + DELIMITER +
BLOOM_FILTER_SUFFIX) : null);
+ return new
LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER
+ BTREE_SUFFIX),
+ null,
+ hasBloomFilter ? getCompressedFileReferenceIfAny(baseName +
DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
public LSMComponentFileReferences getRelMergeFileReference(String
firstFileName, String lastFileName) {
final String baseName =
IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
- return new LSMComponentFileReferences(getFileReference(baseName +
DELIMITER + BTREE_SUFFIX), null,
- hasBloomFilter ? getFileReference(baseName + DELIMITER +
BLOOM_FILTER_SUFFIX) : null);
+ return new
LSMComponentFileReferences(getCompressedFileReferenceIfAny(baseName + DELIMITER
+ BTREE_SUFFIX),
+ null,
+ hasBloomFilter ? getCompressedFileReferenceIfAny(baseName +
DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 4da207da00..610232f812 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
@@ -32,7 +33,6 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
@@ -139,9 +139,9 @@ public abstract class AbstractLSMIndexFileManager
implements ILSMIndexFileManage
protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
TreeIndexFactory<? extends ITreeIndex> treeFactory,
ArrayList<IndexComponentFileReference> allFiles,
IBufferCache bufferCache) throws HyracksDataException {
- String[] files = listDirFiles(baseDir, filter);
- for (String fileName : files) {
- FileReference fileRef = getFileReference(fileName);
+ Set<FileReference> files = ioManager.list(baseDir, filter);
+ for (FileReference filePath : files) {
+ FileReference fileRef =
getCompressedFileReferenceIfAny(filePath.getName());
if (treeFactory == null) {
allFiles.add(IndexComponentFileReference.of(fileRef));
continue;
@@ -155,24 +155,6 @@ public abstract class AbstractLSMIndexFileManager
implements ILSMIndexFileManage
}
}
- static String[] listDirFiles(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
- /*
- * Returns null if this abstract pathname does not denote a directory,
or if an I/O error occurs.
- */
- String[] files = dir.getFile().list(filter);
- if (files == null) {
- if (!dir.getFile().canRead()) {
- throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE,
dir);
- } else if (!dir.getFile().exists()) {
- throw
HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, dir);
- } else if (!dir.getFile().isDirectory()) {
- throw
HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
- }
- throw
HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
- }
- return files;
- }
-
protected void validateFiles(HashSet<String> groundTruth,
ArrayList<IndexComponentFileReference> validFiles,
FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex>
treeFactory, IBufferCache bufferCache)
throws HyracksDataException {
@@ -189,15 +171,15 @@ public abstract class AbstractLSMIndexFileManager
implements ILSMIndexFileManage
@Override
public void createDirs() throws HyracksDataException {
- if (baseDir.getFile().exists()) {
+ if (ioManager.exists(baseDir)) {
throw
HyracksDataException.create(ErrorCode.CANNOT_CREATE_EXISTING_INDEX);
}
- baseDir.getFile().mkdirs();
+ ioManager.makeDirectories(baseDir);
}
@Override
public void deleteDirs() throws HyracksDataException {
- IoUtil.delete(baseDir);
+ ioManager.deleteDirectory(baseDir);
}
@Override
@@ -314,7 +296,7 @@ public abstract class AbstractLSMIndexFileManager
implements ILSMIndexFileManage
return
IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
}
- protected FileReference getFileReference(String name) {
+ protected FileReference getCompressedFileReferenceIfAny(String name) {
final ICompressorDecompressor compDecomp =
compressorDecompressorFactory.createInstance();
//Avoid creating LAF file for NoOpCompressorDecompressor
if (compDecomp != NoOpCompressorDecompressor.INSTANCE &&
isCompressible(name)) {
@@ -331,9 +313,9 @@ public abstract class AbstractLSMIndexFileManager
implements ILSMIndexFileManage
private long getOnDiskLastUsedComponentSequence(FilenameFilter
filenameFilter) throws HyracksDataException {
long maxComponentSeq = -1;
- final String[] files = listDirFiles(baseDir, filenameFilter);
- for (String fileName : files) {
- maxComponentSeq = Math.max(maxComponentSeq,
IndexComponentFileReference.of(fileName).getSequenceEnd());
+ final Set<FileReference> files = ioManager.list(baseDir,
filenameFilter);
+ for (FileReference file : files) {
+ maxComponentSeq = Math.max(maxComponentSeq,
IndexComponentFileReference.of(file).getSequenceEnd());
}
return maxComponentSeq;
}