This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c7093fdb [ASTERIXDB-3196][TX] Support atomic Txn with no WAL
63c7093fdb is described below

commit 63c7093fdbf23083e4b940180196bee37e5d818f
Author: Murtadha Hubail <[email protected]>
AuthorDate: Wed May 31 04:17:28 2023 +0300

    [ASTERIXDB-3196][TX] Support atomic Txn with no WAL
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Prevent concurrent metadata catalog modification.
    - Introduce atomic txn without WAL that will persist
      all records on commit or delete on abort.
    - Compute-storage separation fixes.
    
    Change-Id: Icfd034a4dc0b6464564e2129a166e4ceb0dc7b41
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17583
    Reviewed-by: Murtadha Hubail <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../asterix/app/nc/IndexCheckpointManager.java     |  7 +-
 .../api/common/CloudStorageIntegrationUtil.java    |  2 +-
 .../TestPrimaryIndexOperationTrackerFactory.java   | 11 ---
 .../org/apache/asterix/cloud/CloudIOManager.java   |  8 +-
 .../context/PrimaryIndexOperationTracker.java      | 27 +++++++
 .../common/transactions/ITransactionContext.java   | 14 ++++
 .../common/transactions/ITransactionManager.java   |  4 +
 .../org/apache/asterix/metadata/MetadataNode.java  | 26 +++++--
 .../messaging/CheckpointPartitionIndexesTask.java  |  2 +-
 .../opcallbacks/NoOpModificationOpCallback.java    | 46 +++++++++++
 .../PersistentLocalResourceRepository.java         | 27 +++----
 .../transaction/AbstractTransactionContext.java    | 21 ++++++
 .../transaction/AtomicNoWALTransactionContext.java | 88 ++++++++++++++++++++++
 .../transaction/AtomicTransactionContext.java      | 10 +++
 .../transaction/EntityLevelTransactionContext.java |  5 ++
 .../transaction/TransactionContextFactory.java     |  2 +
 .../service/transaction/TransactionManager.java    | 22 +++---
 .../java/org/apache/hyracks/api/io/IIOManager.java | 11 ++-
 .../java/org/apache/hyracks/api/util/IoUtil.java   | 17 +++++
 .../apache/hyracks/control/nc/io/IOManager.java    | 37 +--------
 .../storage/am/common/build/IndexBuilder.java      |  3 +-
 .../buffercache/AbstractBufferedFileIOManager.java |  8 +-
 .../storage/common/buffercache/BufferCache.java    |  7 +-
 .../compression/file/CompressedFileManager.java    |  7 +-
 .../common/file/CompressedBufferedFileHandle.java  |  2 +-
 .../am/common/AbstractIndexLifecycleTest.java      |  2 -
 26 files changed, 312 insertions(+), 104 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 2ed163812e..372cf69c7c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -154,7 +154,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         }
         if (checkpoints.isEmpty()) {
             LOGGER.warn("Couldn't find any checkpoint file for index {}. 
Content of dir are {}.", indexPath,
-                    ioManager.getMatchingFiles(indexPath, 
IoUtil.NO_OP_FILTER).toString());
+                    ioManager.list(indexPath, IoUtil.NO_OP_FILTER).toString());
             throw new IllegalStateException("Couldn't find any checkpoints for 
resource: " + indexPath);
         }
         
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -182,7 +182,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
 
     private List<IndexCheckpoint> getCheckpoints() throws 
ClosedByInterruptException, HyracksDataException {
         List<IndexCheckpoint> checkpoints = new ArrayList<>();
-        final Collection<FileReference> checkpointFiles = 
ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+        final Collection<FileReference> checkpointFiles = 
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
         if (!checkpointFiles.isEmpty()) {
             for (FileReference checkpointFile : checkpointFiles) {
                 try {
@@ -229,8 +229,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
 
     private void deleteHistory(long latestId, int historyToKeep) {
         try {
-            final Collection<FileReference> checkpointFiles =
-                    ioManager.getMatchingFiles(indexPath, 
CHECKPOINT_FILE_FILTER);
+            final Collection<FileReference> checkpointFiles = 
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
             if (!checkpointFiles.isEmpty()) {
                 for (FileReference checkpointFile : checkpointFiles) {
                     if (getCheckpointIdFromFileName(checkpointFile) < 
(latestId - historyToKeep)) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index cc69bb15c0..eb2b05bcb8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -27,7 +27,7 @@ public class CloudStorageIntegrationUtil extends 
AsterixHyracksIntegrationUtil {
     public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, 
"cc-cloud-storage.conf");
 
     public static void main(String[] args) throws Exception {
-        //        CloudUtils.startS3CloudEnvironment();
+        CloudUtils.startS3CloudEnvironment();
         final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
         try {
             integrationUtil.run(Boolean.getBoolean("cleanup.start"), 
Boolean.getBoolean("cleanup.shutdown"),
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index c4390fa52c..38fdf5689d 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.dataflow;
 
 import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -70,20 +69,10 @@ public class TestPrimaryIndexOperationTrackerFactory 
extends PrimaryIndexOperati
         }
     }
 
-    static void setFinal(Field field, Object obj, Object newValue) throws 
Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(obj, newValue);
-    }
-
     @SuppressWarnings({ "rawtypes", "unchecked" })
     static void replaceMapEntry(Field field, Object obj, Object key, Object 
value)
             throws Exception, IllegalAccessException {
         field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
         Map map = (Map) field.get(obj);
         map.put(key, value);
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index f87adbaefd..799da68333 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -185,7 +185,8 @@ public class CloudIOManager extends IOManager {
 
         // Add the remaining files that are not stored locally (if any)
         for (String cloudFile : cloudFiles) {
-            
localFiles.add(dir.getChild(IoUtil.getFileNameFromPath(cloudFile)));
+            localFiles.add(new FileReference(dir.getDeviceHandle(),
+                    
cloudFile.substring(cloudFile.indexOf(dir.getRelativePath()))));
         }
         return new HashSet<>(localFiles);
     }
@@ -244,12 +245,11 @@ public class CloudIOManager extends IOManager {
         return super.doSyncRead(fHandle, offset, data);
     }
 
-    // TODO: We need to download this too
     @Override
     public byte[] readAllBytes(FileReference fileRef) throws 
HyracksDataException {
         if (!fileRef.getFile().exists()) {
-            // TODO(htowaileb): if it does not exist, download (lazy)
-            // TODO(htowaileb): make sure downloading the file is synchronous 
since many can request it at the same time
+            IFileHandle open = open(fileRef, FileReadWriteMode.READ_WRITE, 
FileSyncMode.METADATA_SYNC_DATA_SYNC);
+            fileRef = open.getFileReference();
         }
         return super.readAllBytes(fileRef);
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02af7..2704e6429b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -37,6 +38,7 @@ import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -297,6 +299,31 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
         return "Dataset (" + datasetID + "), Partition (" + partition + ")";
     }
 
+    public void deleteMemoryComponent() throws HyracksDataException {
+        Set<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
+        ILSMIndex primaryLsmIndex = null;
+        for (ILSMIndex lsmIndex : indexes) {
+            if (lsmIndex.isPrimaryIndex()) {
+                if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                    LOGGER.info("Primary index on dataset {} and partition {} 
is empty... skipping delete",
+                            dsInfo.getDatasetID(), partition);
+                    return;
+                }
+                primaryLsmIndex = lsmIndex;
+                break;
+            }
+        }
+        Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + 
indexes);
+        idGenerator.refresh();
+        ILSMComponentId nextComponentId = idGenerator.getId();
+        Map<String, Object> flushMap = new HashMap<>();
+        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, 
nextComponentId);
+        ILSMIndexAccessor accessor = 
primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        accessor.getOpContext().setParameters(flushMap);
+        accessor.deleteComponents(c -> c.getType() == 
ILSMComponent.LSMComponentType.MEMORY);
+    }
+
     private boolean canSafelyFlush() {
         return numActiveOperations.get() == 0;
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 940535f506..34d7caa1c9 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
@@ -145,4 +147,16 @@ public interface ITransactionContext {
      * so that any resources held by the transaction may be released
      */
     void complete();
+
+    /**
+     * Acquires {@code lock} write lock and sets the transactions as a write 
transaction
+     * @param lock
+     */
+    void acquireExclusiveWriteLock(ReentrantLock lock);
+
+    /**
+     * Determines if this tx uses WAL
+     * @return true if this tx uses WAL. Otherwise, false.
+     */
+    boolean hasWAL();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 396d3f6c8e..30c693d219 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -42,6 +42,10 @@ public interface ITransactionManager {
          * all records are committed or nothing
          */
         ATOMIC,
+        /**
+         * all records are committed and persisted to disk or nothing
+         */
+        ATOMIC_NO_WAL,
         /**
          * any record with entity commit log
          */
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 2195f88cba..522fd32556 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -116,6 +117,7 @@ import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
+import 
org.apache.asterix.transaction.management.opcallbacks.NoOpModificationOpCallback;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -161,6 +163,8 @@ public class MetadataNode implements IMetadataNode {
     private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
     // extension only
     private Map<ExtensionMetadataDatasetId, ExtensionMetadataDataset<?>> 
extensionDatasets;
+    private final ReentrantLock metadataModificationLock = new 
ReentrantLock(true);
+    private boolean atomicNoWAL;
 
     public static final MetadataNode INSTANCE = new MetadataNode();
 
@@ -184,6 +188,7 @@ public class MetadataNode implements IMetadataNode {
             }
         }
         this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
+        atomicNoWAL = runtimeContext.isCloudDeployment();
     }
 
     public int getMetadataStoragePartition() {
@@ -192,7 +197,8 @@ public class MetadataNode implements IMetadataNode {
 
     @Override
     public void beginTransaction(TxnId transactionId) {
-        TransactionOptions options = new 
TransactionOptions(AtomicityLevel.ATOMIC);
+        AtomicityLevel lvl = atomicNoWAL ? AtomicityLevel.ATOMIC_NO_WAL : 
AtomicityLevel.ATOMIC;
+        TransactionOptions options = new TransactionOptions(lvl);
         
transactionSubsystem.getTransactionManager().beginTransaction(transactionId, 
options);
     }
 
@@ -501,11 +507,7 @@ public class MetadataNode implements IMetadataNode {
         if (!force) {
             confirmFullTextFilterCanBeDeleted(txnId, dataverseName, 
filterName);
         }
-
         try {
-            FullTextFilterMetadataEntityTupleTranslator translator =
-                    
tupleTranslatorProvider.getFullTextFilterTupleTranslator(true);
-
             ITupleReference key = 
createTuple(dataverseName.getCanonicalForm(), filterName);
             deleteTupleFromIndex(txnId, 
MetadataPrimaryIndexes.FULL_TEXT_FILTER_DATASET, key);
         } catch (HyracksDataException e) {
@@ -612,7 +614,7 @@ public class MetadataNode implements IMetadataNode {
             IModificationOperationCallback modCallback = 
createIndexModificationCallback(op, txnCtx, metadataIndex);
             IIndexAccessParameters iap = new 
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
-            txnCtx.setWriteTxn(true);
+            txnCtx.acquireExclusiveWriteLock(metadataModificationLock);
             txnCtx.register(metadataIndex.getResourceId(),
                     
StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, 
modCallback,
                     metadataIndex.isPrimaryIndex());
@@ -640,6 +642,12 @@ public class MetadataNode implements IMetadataNode {
         switch (indexOp) {
             case INSERT:
             case DELETE:
+                if (!txnCtx.hasWAL()) {
+                    return new 
NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+                            metadataIndex.getPrimaryKeyIndexes(), txnCtx, 
transactionSubsystem.getLockManager(),
+                            transactionSubsystem, 
metadataIndex.getResourceId(), metadataStoragePartition,
+                            ResourceType.LSM_BTREE, indexOp);
+                }
                 /*
                  * Regardless of the index type (primary or secondary index), 
secondary index modification
                  * callback is given. This is still correct since metadata 
index operation doesn't require
@@ -650,6 +658,12 @@ public class MetadataNode implements IMetadataNode {
                         transactionSubsystem, metadataIndex.getResourceId(), 
metadataStoragePartition,
                         ResourceType.LSM_BTREE, indexOp);
             case UPSERT:
+                if (!txnCtx.hasWAL()) {
+                    return new 
NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+                            metadataIndex.getPrimaryKeyIndexes(), txnCtx, 
transactionSubsystem.getLockManager(),
+                            transactionSubsystem, 
metadataIndex.getResourceId(), metadataStoragePartition,
+                            ResourceType.LSM_BTREE, indexOp);
+                }
                 return new 
UpsertOperationCallback(metadataIndex.getDatasetId(), 
metadataIndex.getPrimaryKeyIndexes(),
                         txnCtx, transactionSubsystem.getLockManager(), 
transactionSubsystem,
                         metadataIndex.getResourceId(), 
metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index a6ba0e2916..11bac0d99e 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -71,7 +71,7 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
             // Get most recent sequence of existing files to avoid deletion
             FileReference indexPath = StoragePathUtil.getIndexPath(ioManager, 
ref);
             Collection<FileReference> files =
-                    ioManager.getMatchingFiles(indexPath, 
AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+                    ioManager.list(indexPath, 
AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
             if (files == null) {
                 throw HyracksDataException
                         .create(new IOException(indexPath + " is not a 
directory or an IO Error occurred"));
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
new file mode 100644
index 0000000000..ad77939a66
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class NoOpModificationOpCallback extends 
AbstractIndexModificationOperationCallback {
+
+    public NoOpModificationOpCallback(DatasetId datasetId, int[] 
primaryKeyFields, ITransactionContext txnCtx,
+            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long 
resourceId, int resourcePartition,
+            byte resourceType, Operation indexOp) {
+        super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, 
resourceId, resourcePartition,
+                resourceType, indexOp);
+    }
+
+    @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+        // no op
+    }
+
+    @Override
+    public void found(ITupleReference before, ITupleReference after) throws 
HyracksDataException {
+        // no op
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 25b961017c..4e71b1c629 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -63,6 +63,7 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
@@ -240,7 +241,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             List<FileReference> roots) throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
         for (FileReference root : roots) {
-            final Collection<FileReference> files = 
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+            final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
             try {
                 for (FileReference file : files) {
                     final LocalResource localResource = 
readLocalResource(file);
@@ -274,7 +275,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> 
filter) throws HyracksDataException {
         for (FileReference root : storageRoots) {
-            final Collection<FileReference> files = 
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+            final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
             try {
                 for (FileReference file : files) {
                     final LocalResource localResource = 
readLocalResource(file);
@@ -452,7 +453,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     private List<String> getIndexFiles(FileReference indexDir) throws 
HyracksDataException {
         final List<String> indexFiles = new ArrayList<>();
-        Collection<FileReference> indexFilteredFiles = 
ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+        Collection<FileReference> indexFilteredFiles = 
ioManager.list(indexDir, LSM_INDEX_FILES_FILTER);
         
indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
         return indexFiles;
     }
@@ -492,8 +493,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     public synchronized void deleteCorruptedResources() throws 
HyracksDataException {
         for (FileReference root : storageRoots) {
-            final Collection<FileReference> metadataMaskFiles =
-                    ioManager.getMatchingFiles(root, 
METADATA_MASK_FILES_FILTER);
+            final Collection<FileReference> metadataMaskFiles = 
ioManager.list(root, METADATA_MASK_FILES_FILTER);
             for (FileReference metadataMaskFile : metadataMaskFiles) {
                 final FileReference resourceFile = 
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
                 ioManager.delete(resourceFile);
@@ -503,7 +503,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     private void deleteIndexMaskedFiles(FileReference index) throws 
IOException {
-        Collection<FileReference> masks = ioManager.getMatchingFiles(index, 
MASK_FILES_FILTER);
+        Collection<FileReference> masks = ioManager.list(index, 
MASK_FILES_FILTER);
         for (FileReference mask : masks) {
             deleteIndexMaskedFiles(index, mask);
             // delete the mask itself
@@ -520,7 +520,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     private void deleteIndexInvalidComponents(FileReference index) throws 
IOException, ParseException {
-        final Collection<FileReference> indexComponentFiles = 
ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
+        final Collection<FileReference> indexComponentFiles = 
ioManager.list(index, COMPONENT_FILES_FILTER);
         if (indexComponentFiles == null) {
             throw new IOException(index + " doesn't exist or an IO error 
occurred");
         }
@@ -550,10 +550,10 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         Collection<FileReference> maskedFiles;
         if (isComponentMask(mask)) {
             final String componentId = 
mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
-            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> 
name.startsWith(componentId));
+            maskedFiles = ioManager.list(index, (dir, name) -> 
name.startsWith(componentId));
         } else {
             final String maskedFileName = 
mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
-            maskedFiles = ioManager.getMatchingFiles(index, (dir, name) -> 
name.equals(maskedFileName));
+            maskedFiles = ioManager.list(index, (dir, name) -> 
name.equals(maskedFileName));
         }
         if (maskedFiles != null) {
             for (FileReference maskedFile : maskedFiles) {
@@ -643,14 +643,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public synchronized List<FileReference> getOnDiskPartitions() throws 
HyracksDataException {
+    public synchronized List<FileReference> getOnDiskPartitions() {
         List<FileReference> onDiskPartitions = new ArrayList<>();
         for (FileReference root : storageRoots) {
-            Collection<FileReference> partitions = ioManager.list(root, (dir, 
name) -> dir != null && dir.isDirectory()
-                    && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
-            if (partitions != null) {
-                onDiskPartitions.addAll(partitions);
-            }
+            onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, 
name) -> dir != null && dir.isDirectory()
+                    && 
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
         }
         return onDiskPartitions;
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 104f9a78ac..8a1856ced5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -23,8 +23,10 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.asterix.common.context.ITransactionOperationTracker;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TxnId;
@@ -42,6 +44,7 @@ public abstract class AbstractTransactionContext implements 
ITransactionContext
     private final AtomicInteger txnState;
     private final AtomicBoolean isWriteTxn;
     private volatile boolean isTimeout;
+    private ReentrantLock exclusiveLock;
 
     protected AbstractTransactionContext(TxnId txnId) {
         this.txnId = txnId;
@@ -89,6 +92,21 @@ public abstract class AbstractTransactionContext implements 
ITransactionContext
         return isTimeout;
     }
 
+    @Override
+    public void acquireExclusiveWriteLock(ReentrantLock exclusiveLock) {
+        if (isWriteTxn.get()) {
+            return;
+        }
+        try {
+            exclusiveLock.lockInterruptibly();
+            this.exclusiveLock = exclusiveLock;
+            setWriteTxn(true);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ACIDException(e);
+        }
+    }
+
     @Override
     public void setWriteTxn(boolean isWriteTxn) {
         this.isWriteTxn.set(isWriteTxn);
@@ -114,6 +132,9 @@ public abstract class AbstractTransactionContext implements 
ITransactionContext
             synchronized (txnOpTrackers) {
                 txnOpTrackers.forEach((resource, opTracker) -> 
opTracker.afterTransaction(resource));
             }
+            if (exclusiveLock != null) {
+                exclusiveLock.unlock();
+            }
         }
     }
 
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
new file mode 100644
index 0000000000..cfa39c61c0
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
+
+    public AtomicNoWALTransactionContext(TxnId txnId) {
+        super(txnId);
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+        final int txnState = getTxnState();
+        switch (txnState) {
+            case ITransactionManager.ABORTED:
+                deleteUncommittedRecords();
+                break;
+            case ITransactionManager.COMMITTED:
+                ensureDurable();
+                break;
+            default:
+                throw new IllegalStateException("invalid state in txn clean 
up: " + getTxnState());
+        }
+    }
+
+    private void deleteUncommittedRecords() {
+        for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+            PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
+            try {
+                primaryIndexOpTracker.deleteMemoryComponent();
+            } catch (HyracksDataException e) {
+                throw new ACIDException(e);
+            }
+        }
+    }
+
+    private void ensureDurable() {
+        List<FlushOperation> flushes = new ArrayList<>();
+        LogRecord dummyLogRecord = new LogRecord();
+        try {
+            for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+                PrimaryIndexOperationTracker primaryIndexOpTracker = 
(PrimaryIndexOperationTracker) opTrackerRef;
+                primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
+                flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+            }
+            LSMIndexUtil.waitFor(flushes);
+        } catch (HyracksDataException e) {
+            throw new ACIDException(e);
+        }
+    }
+
+    @Override
+    public boolean hasWAL() {
+        return false;
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 083c26b795..870a76b1c8 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,7 +18,10 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,6 +41,7 @@ public class AtomicTransactionContext extends 
AbstractTransactionContext {
     private final Map<Long, ILSMOperationTracker> opTrackers = new 
ConcurrentHashMap<>();
     private final Map<Long, AtomicInteger> indexPendingOps = new 
ConcurrentHashMap<>();
     private final Map<Long, IModificationOperationCallback> callbacks = new 
ConcurrentHashMap<>();
+    protected final Set<ILSMOperationTracker> modifiedIndexes = 
Collections.synchronizedSet(new HashSet<>());
 
     public AtomicTransactionContext(TxnId txnId) {
         super(txnId);
@@ -64,6 +68,7 @@ public class AtomicTransactionContext extends 
AbstractTransactionContext {
     @Override
     public void beforeOperation(long resourceId) {
         indexPendingOps.get(resourceId).incrementAndGet();
+        modifiedIndexes.add(opTrackers.get(resourceId));
     }
 
     @Override
@@ -110,4 +115,9 @@ public class AtomicTransactionContext extends 
AbstractTransactionContext {
         AtomicTransactionContext that = (AtomicTransactionContext) o;
         return this.txnId.equals(that.txnId);
     }
+
+    @Override
+    public boolean hasWAL() {
+        return true;
+    }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index be5874ed48..2e5f6dd415 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -113,4 +113,9 @@ public class EntityLevelTransactionContext extends 
AbstractTransactionContext {
         EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
         return this.txnId.equals(that.txnId);
     }
+
+    @Override
+    public boolean hasWAL() {
+        return true;
+    }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 4a465a4de1..1076086346 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -34,6 +34,8 @@ public class TransactionContextFactory {
         switch (atomicityLevel) {
             case ATOMIC:
                 return new AtomicTransactionContext(txnId);
+            case ATOMIC_NO_WAL:
+                return new AtomicNoWALTransactionContext(txnId);
             case ENTITY_LEVEL:
                 return new EntityLevelTransactionContext(txnId);
             default:
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index ee659628be..8bbfbfdb85 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -81,9 +81,11 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
         final ITransactionContext txnCtx = getTransactionContext(txnId);
         try {
             if (txnCtx.isWriteTxn()) {
-                LogRecord logRecord = new LogRecord();
-                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
true);
-                txnSubsystem.getLogManager().log(logRecord);
+                if (txnCtx.hasWAL()) {
+                    LogRecord logRecord = new LogRecord();
+                    TransactionUtil.formJobTerminateLogRecord(txnCtx, 
logRecord, true);
+                    txnSubsystem.getLogManager().log(logRecord);
+                }
                 txnCtx.setTxnState(ITransactionManager.COMMITTED);
             }
         } catch (Exception e) {
@@ -103,13 +105,15 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
         final ITransactionContext txnCtx = getTransactionContext(txnId);
         try {
             if (txnCtx.isWriteTxn()) {
-                if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
-                    LogRecord logRecord = new LogRecord();
-                    TransactionUtil.formJobTerminateLogRecord(txnCtx, 
logRecord, false);
-                    txnSubsystem.getLogManager().log(logRecord);
-                    txnSubsystem.getCheckpointManager().secure(txnId);
+                if (txnCtx.hasWAL()) {
+                    if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
+                        LogRecord logRecord = new LogRecord();
+                        TransactionUtil.formJobTerminateLogRecord(txnCtx, 
logRecord, false);
+                        txnSubsystem.getLogManager().log(logRecord);
+                        txnSubsystem.getCheckpointManager().secure(txnId);
+                    }
+                    
txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 }
-                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 txnCtx.setTxnState(ITransactionManager.ABORTED);
             }
         } catch (HyracksDataException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 63226ac7d6..10f16372a7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -23,7 +23,6 @@ import java.io.FilenameFilter;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
@@ -125,6 +124,13 @@ public interface IIOManager extends Closeable {
 
     Set<FileReference> list(FileReference dir) throws HyracksDataException;
 
+    /**
+     * Lists the files matching {@code filter} recursively starting from 
{@code dir}
+     * @param dir
+     * @param filter
+     * @return the matching files
+     * @throws HyracksDataException
+     */
     Set<FileReference> list(FileReference dir, FilenameFilter filter) throws 
HyracksDataException;
 
     void overwrite(FileReference fileRef, byte[] bytes) throws 
ClosedByInterruptException, HyracksDataException;
@@ -136,9 +142,6 @@ public interface IIOManager extends Closeable {
 
     void deleteDirectory(FileReference root) throws HyracksDataException;
 
-    // TODO: Remove and use list
-    Collection<FileReference> getMatchingFiles(FileReference root, 
FilenameFilter filter) throws HyracksDataException;
-
     boolean exists(FileReference fileRef) throws HyracksDataException;
 
     void create(FileReference fileRef) throws HyracksDataException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 7644a306bd..05e65449bc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -29,8 +29,11 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -194,4 +197,18 @@ public class IoUtil {
     public static String getFileNameFromPath(String path) {
         return path.substring(path.lastIndexOf('/') + 1);
     }
+
+    public static Collection<FileReference> getMatchingChildren(FileReference 
root, FilenameFilter filter) {
+        if (!root.getFile().isDirectory()) {
+            throw new IllegalArgumentException("Parameter 'root' is not a 
directory: " + root);
+        }
+        Objects.requireNonNull(filter);
+        List<FileReference> files = new ArrayList<>();
+        String[] matchingFiles = root.getFile().list(filter);
+        if (matchingFiles != null) {
+            files.addAll(Arrays.stream(matchingFiles).map(pDir -> new 
FileReference(root.getDeviceHandle(), pDir))
+                    .collect(Collectors.toList()));
+        }
+        return files;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 19fbff37ec..1733ad47b1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -517,27 +517,13 @@ public class IOManager implements IIOManager {
 
     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
-        /*
-         * Throws an error if this abstract pathname does not denote a 
directory, or if an I/O error occurs.
-         * Returns an empty set if the file does not exist, otherwise, returns 
the files in the specified directory
-         */
         Set<FileReference> listedFiles = new HashSet<>();
         if (!dir.getFile().exists()) {
             return listedFiles;
         }
-
-        String[] files = dir.getFile().list(filter);
-        if (files == null) {
-            if (!dir.getFile().canRead()) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, 
dir);
-            } else if (!dir.getFile().isDirectory()) {
-                throw 
HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
-            }
-            throw 
HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
-        }
-
-        for (String file : files) {
-            listedFiles.add(dir.getChild(file));
+        Collection<File> files = 
IoUtil.getMatchingFiles(dir.getFile().toPath(), filter);
+        for (File file : files) {
+            listedFiles.add(resolveAbsolutePath(file.getAbsolutePath()));
         }
         return listedFiles;
     }
@@ -578,23 +564,6 @@ public class IOManager implements IIOManager {
         }
     }
 
-    @Override
-    public Collection<FileReference> getMatchingFiles(FileReference root, 
FilenameFilter filter)
-            throws HyracksDataException {
-        File rootFile = root.getFile();
-        if (!rootFile.exists() || !rootFile.isDirectory()) {
-            return Collections.emptyList();
-        }
-
-        Collection<File> files = IoUtil.getMatchingFiles(rootFile.toPath(), 
filter);
-        Set<FileReference> fileReferences = new HashSet<>();
-        for (File file : files) {
-            fileReferences.add(resolveAbsolutePath(file.getAbsolutePath()));
-        }
-
-        return fileReferences;
-    }
-
     @Override
     public boolean exists(FileReference fileRef) throws HyracksDataException {
         return fileRef.getFile().exists();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 45bfed16dd..1c0d7b49d5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.common.IIndex;
@@ -101,7 +100,7 @@ public class IndexBuilder implements IIndexBuilder {
                     LOGGER.warn(
                             "Deleting {} on index create. The index is not 
registered but the file exists in the filesystem",
                             resolvedResourceRef);
-                    IoUtil.delete(resolvedResourceRef);
+                    ctx.getIoManager().delete(resolvedResourceRef);
                 }
                 index = resource.createInstance(ctx);
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 6a4f10dff0..1bceea3852 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -42,8 +42,8 @@ public abstract class AbstractBufferedFileIOManager {
 
     protected final BufferCache bufferCache;
     protected final IPageReplacementStrategy pageReplacementStrategy;
+    protected final IOManager ioManager;
     private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
-    private final IOManager ioManager;
 
     private IFileHandle fileHandle;
     private volatile boolean hasOpen;
@@ -193,7 +193,7 @@ public abstract class AbstractBufferedFileIOManager {
         }
     }
 
-    public static void deleteFile(FileReference fileRef) throws 
HyracksDataException {
+    public static void deleteFile(FileReference fileRef, IIOManager ioManager) 
throws HyracksDataException {
         HyracksDataException savedEx = null;
 
         /*
@@ -206,7 +206,7 @@ public abstract class AbstractBufferedFileIOManager {
                 final CompressedFileReference cFileRef = 
(CompressedFileReference) fileRef;
                 final FileReference lafFileRef = 
cFileRef.getLAFFileReference();
                 if (lafFileRef.getFile().exists()) {
-                    IoUtil.delete(lafFileRef);
+                    ioManager.delete(lafFileRef);
                 }
             }
         } catch (HyracksDataException e) {
@@ -214,7 +214,7 @@ public abstract class AbstractBufferedFileIOManager {
         }
 
         try {
-            IoUtil.delete(fileRef);
+            ioManager.delete(fileRef);
         } catch (HyracksDataException e) {
             if (savedEx != null) {
                 savedEx.addSuppressed(e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 70500e5845..10284f2c4d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
 import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.api.util.IoUtil;
 import 
org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
@@ -761,7 +760,7 @@ public class BufferCache implements IBufferCacheInternal, 
ILifeCycleComponent, I
         } catch (Exception e) {
             // If file registration failed for any reason, we need to undo the 
file creation
             try {
-                IoUtil.delete(fileRef);
+                ioManager.delete(fileRef);
             } catch (Exception deleteException) {
                 e.addSuppressed(deleteException);
             }
@@ -960,7 +959,7 @@ public class BufferCache implements IBufferCacheInternal, 
ILifeCycleComponent, I
         if (mapped) {
             deleteFile(fileId);
         } else {
-            BufferedFileHandle.deleteFile(fileRef);
+            BufferedFileHandle.deleteFile(fileRef, ioManager);
         }
     }
 
@@ -991,7 +990,7 @@ public class BufferCache implements IBufferCacheInternal, 
ILifeCycleComponent, I
                         fInfo.markAsDeleted();
                     }
                 } finally {
-                    BufferedFileHandle.deleteFile(fileRef);
+                    BufferedFileHandle.deleteFile(fileRef, ioManager);
                 }
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
index 7d0cc62878..d78ac244f9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
 
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
@@ -64,6 +65,7 @@ public class CompressedFileManager {
     private final IBufferCache bufferCache;
     private final ICompressorDecompressor compressorDecompressor;
     private final CompressedFileReference fileRef;
+    private final IIOManager ioManager;
 
     private int fileId;
     private State state;
@@ -71,12 +73,13 @@ public class CompressedFileManager {
 
     private LAFWriter lafWriter;
 
-    public CompressedFileManager(IBufferCache bufferCache, 
CompressedFileReference fileRef) {
+    public CompressedFileManager(IBufferCache bufferCache, 
CompressedFileReference fileRef, IIOManager ioManager) {
         state = State.CLOSED;
         totalNumOfPages = 0;
         this.bufferCache = bufferCache;
         this.fileRef = fileRef;
         this.compressorDecompressor = fileRef.getCompressorDecompressor();
+        this.ioManager = ioManager;
     }
 
     /* ************************
@@ -99,7 +102,7 @@ public class CompressedFileManager {
         ensureState(CLOSED);
 
         boolean open = false;
-        if (fileRef.getLAFFileReference().getFile().exists()) {
+        if (ioManager.exists(fileRef.getLAFFileReference())) {
             fileId = bufferCache.openFile(fileRef.getLAFFileReference());
             open = true;
         } else {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index b6178c5f12..01811c7945 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -157,7 +157,7 @@ public class CompressedBufferedFileHandle extends 
BufferedFileHandle {
     @Override
     public void open(FileReference fileRef) throws HyracksDataException {
         final CompressedFileReference cFileRef = (CompressedFileReference) 
fileRef;
-        compressedFileManager = new CompressedFileManager(bufferCache, 
cFileRef);
+        compressedFileManager = new CompressedFileManager(bufferCache, 
cFileRef, ioManager);
         compressedFileManager.open();
         super.open(fileRef);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
index ddb57171a1..de8bdc8c75 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.storage.am.common;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.util.Log4j2Monitor;
 import org.apache.logging.log4j.Level;
@@ -97,7 +96,6 @@ public abstract class AbstractIndexLifecycleTest {
         index.destroy();
         Assert.assertFalse(persistentStateExists());
         index.destroy();
-        Assert.assertTrue(Log4j2Monitor.count(IoUtil.FILE_NOT_FOUND_MSG) > 0);
         Assert.assertFalse(persistentStateExists());
     }
 

Reply via email to