This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 63c7093fdb [ASTERIXDB-3196][TX] Support atomic Txn with no WAL
63c7093fdb is described below
commit 63c7093fdbf23083e4b940180196bee37e5d818f
Author: Murtadha Hubail <[email protected]>
AuthorDate: Wed May 31 04:17:28 2023 +0300
[ASTERIXDB-3196][TX] Support atomic Txn with no WAL
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Prevent concurrent metadata catalog modification.
- Introduce atomic txn without WAL that will persist
all records on commit or delete on abort.
- Compute-storage separation fixes.
Change-Id: Icfd034a4dc0b6464564e2129a166e4ceb0dc7b41
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17583
Reviewed-by: Murtadha Hubail <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../asterix/app/nc/IndexCheckpointManager.java | 7 +-
.../api/common/CloudStorageIntegrationUtil.java | 2 +-
.../TestPrimaryIndexOperationTrackerFactory.java | 11 ---
.../org/apache/asterix/cloud/CloudIOManager.java | 8 +-
.../context/PrimaryIndexOperationTracker.java | 27 +++++++
.../common/transactions/ITransactionContext.java | 14 ++++
.../common/transactions/ITransactionManager.java | 4 +
.../org/apache/asterix/metadata/MetadataNode.java | 26 +++++--
.../messaging/CheckpointPartitionIndexesTask.java | 2 +-
.../opcallbacks/NoOpModificationOpCallback.java | 46 +++++++++++
.../PersistentLocalResourceRepository.java | 27 +++----
.../transaction/AbstractTransactionContext.java | 21 ++++++
.../transaction/AtomicNoWALTransactionContext.java | 88 ++++++++++++++++++++++
.../transaction/AtomicTransactionContext.java | 10 +++
.../transaction/EntityLevelTransactionContext.java | 5 ++
.../transaction/TransactionContextFactory.java | 2 +
.../service/transaction/TransactionManager.java | 22 +++---
.../java/org/apache/hyracks/api/io/IIOManager.java | 11 ++-
.../java/org/apache/hyracks/api/util/IoUtil.java | 17 +++++
.../apache/hyracks/control/nc/io/IOManager.java | 37 +--------
.../storage/am/common/build/IndexBuilder.java | 3 +-
.../buffercache/AbstractBufferedFileIOManager.java | 8 +-
.../storage/common/buffercache/BufferCache.java | 7 +-
.../compression/file/CompressedFileManager.java | 7 +-
.../common/file/CompressedBufferedFileHandle.java | 2 +-
.../am/common/AbstractIndexLifecycleTest.java | 2 -
26 files changed, 312 insertions(+), 104 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 2ed163812e..372cf69c7c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -154,7 +154,7 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
}
if (checkpoints.isEmpty()) {
LOGGER.warn("Couldn't find any checkpoint file for index {}.
Content of dir are {}.", indexPath,
- ioManager.getMatchingFiles(indexPath,
IoUtil.NO_OP_FILTER).toString());
+ ioManager.list(indexPath, IoUtil.NO_OP_FILTER).toString());
throw new IllegalStateException("Couldn't find any checkpoints for
resource: " + indexPath);
}
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
@@ -182,7 +182,7 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
private List<IndexCheckpoint> getCheckpoints() throws
ClosedByInterruptException, HyracksDataException {
List<IndexCheckpoint> checkpoints = new ArrayList<>();
- final Collection<FileReference> checkpointFiles =
ioManager.getMatchingFiles(indexPath, CHECKPOINT_FILE_FILTER);
+ final Collection<FileReference> checkpointFiles =
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
if (!checkpointFiles.isEmpty()) {
for (FileReference checkpointFile : checkpointFiles) {
try {
@@ -229,8 +229,7 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
private void deleteHistory(long latestId, int historyToKeep) {
try {
- final Collection<FileReference> checkpointFiles =
- ioManager.getMatchingFiles(indexPath,
CHECKPOINT_FILE_FILTER);
+ final Collection<FileReference> checkpointFiles =
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
if (!checkpointFiles.isEmpty()) {
for (FileReference checkpointFile : checkpointFiles) {
if (getCheckpointIdFromFileName(checkpointFile) <
(latestId - historyToKeep)) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index cc69bb15c0..eb2b05bcb8 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -27,7 +27,7 @@ public class CloudStorageIntegrationUtil extends
AsterixHyracksIntegrationUtil {
public static final String CONFIG_FILE = joinPath(RESOURCES_PATH,
"cc-cloud-storage.conf");
public static void main(String[] args) throws Exception {
- // CloudUtils.startS3CloudEnvironment();
+ CloudUtils.startS3CloudEnvironment();
final AsterixHyracksIntegrationUtil integrationUtil = new
AsterixHyracksIntegrationUtil();
try {
integrationUtil.run(Boolean.getBoolean("cleanup.start"),
Boolean.getBoolean("cleanup.shutdown"),
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index c4390fa52c..38fdf5689d 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -19,7 +19,6 @@
package org.apache.asterix.test.dataflow;
import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -70,20 +69,10 @@ public class TestPrimaryIndexOperationTrackerFactory
extends PrimaryIndexOperati
}
}
- static void setFinal(Field field, Object obj, Object newValue) throws
Exception {
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
- field.set(obj, newValue);
- }
-
@SuppressWarnings({ "rawtypes", "unchecked" })
static void replaceMapEntry(Field field, Object obj, Object key, Object
value)
throws Exception, IllegalAccessException {
field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
Map map = (Map) field.get(obj);
map.put(key, value);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
index f87adbaefd..799da68333 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
@@ -185,7 +185,8 @@ public class CloudIOManager extends IOManager {
// Add the remaining files that are not stored locally (if any)
for (String cloudFile : cloudFiles) {
-
localFiles.add(dir.getChild(IoUtil.getFileNameFromPath(cloudFile)));
+ localFiles.add(new FileReference(dir.getDeviceHandle(),
+
cloudFile.substring(cloudFile.indexOf(dir.getRelativePath()))));
}
return new HashSet<>(localFiles);
}
@@ -244,12 +245,11 @@ public class CloudIOManager extends IOManager {
return super.doSyncRead(fHandle, offset, data);
}
- // TODO: We need to download this too
@Override
public byte[] readAllBytes(FileReference fileRef) throws
HyracksDataException {
if (!fileRef.getFile().exists()) {
- // TODO(htowaileb): if it does not exist, download (lazy)
- // TODO(htowaileb): make sure downloading the file is synchronous
since many can request it at the same time
+ IFileHandle open = open(fileRef, FileReadWriteMode.READ_WRITE,
FileSyncMode.METADATA_SYNC_DATA_SYNC);
+ fileRef = open.getFileReference();
}
return super.readAllBytes(fileRef);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02af7..2704e6429b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +38,7 @@ import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -297,6 +299,31 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
return "Dataset (" + datasetID + "), Partition (" + partition + ")";
}
+ public void deleteMemoryComponent() throws HyracksDataException {
+ Set<ILSMIndex> indexes =
dsInfo.getDatasetPartitionOpenIndexes(partition);
+ ILSMIndex primaryLsmIndex = null;
+ for (ILSMIndex lsmIndex : indexes) {
+ if (lsmIndex.isPrimaryIndex()) {
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ LOGGER.info("Primary index on dataset {} and partition {}
is empty... skipping delete",
+ dsInfo.getDatasetID(), partition);
+ return;
+ }
+ primaryLsmIndex = lsmIndex;
+ break;
+ }
+ }
+ Objects.requireNonNull(primaryLsmIndex, "no primary index found in " +
indexes);
+ idGenerator.refresh();
+ ILSMComponentId nextComponentId = idGenerator.getId();
+ Map<String, Object> flushMap = new HashMap<>();
+ flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
+ flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID,
nextComponentId);
+ ILSMIndexAccessor accessor =
primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.getOpContext().setParameters(flushMap);
+ accessor.deleteComponents(c -> c.getType() ==
ILSMComponent.LSMComponentType.MEMORY);
+ }
+
private boolean canSafelyFlush() {
return numActiveOperations.get() == 0;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 940535f506..34d7caa1c9 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.transactions;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -145,4 +147,16 @@ public interface ITransactionContext {
* so that any resources held by the transaction may be released
*/
void complete();
+
+ /**
+ * Acquires {@code lock} write lock and sets the transactions as a write
transaction
+ * @param lock
+ */
+ void acquireExclusiveWriteLock(ReentrantLock lock);
+
+ /**
+ * Determines if this tx uses WAL
+ * @return true if this tx uses WAL. Otherwise, false.
+ */
+ boolean hasWAL();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 396d3f6c8e..30c693d219 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -42,6 +42,10 @@ public interface ITransactionManager {
* all records are committed or nothing
*/
ATOMIC,
+ /**
+ * all records are committed and persisted to disk or nothing
+ */
+ ATOMIC_NO_WAL,
/**
* any record with entity commit log
*/
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 2195f88cba..522fd32556 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -116,6 +117,7 @@ import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
+import
org.apache.asterix.transaction.management.opcallbacks.NoOpModificationOpCallback;
import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
import
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
import
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -161,6 +163,8 @@ public class MetadataNode implements IMetadataNode {
private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
// extension only
private Map<ExtensionMetadataDatasetId, ExtensionMetadataDataset<?>>
extensionDatasets;
+ private final ReentrantLock metadataModificationLock = new
ReentrantLock(true);
+ private boolean atomicNoWAL;
public static final MetadataNode INSTANCE = new MetadataNode();
@@ -184,6 +188,7 @@ public class MetadataNode implements IMetadataNode {
}
}
this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
+ atomicNoWAL = runtimeContext.isCloudDeployment();
}
public int getMetadataStoragePartition() {
@@ -192,7 +197,8 @@ public class MetadataNode implements IMetadataNode {
@Override
public void beginTransaction(TxnId transactionId) {
- TransactionOptions options = new
TransactionOptions(AtomicityLevel.ATOMIC);
+ AtomicityLevel lvl = atomicNoWAL ? AtomicityLevel.ATOMIC_NO_WAL :
AtomicityLevel.ATOMIC;
+ TransactionOptions options = new TransactionOptions(lvl);
transactionSubsystem.getTransactionManager().beginTransaction(transactionId,
options);
}
@@ -501,11 +507,7 @@ public class MetadataNode implements IMetadataNode {
if (!force) {
confirmFullTextFilterCanBeDeleted(txnId, dataverseName,
filterName);
}
-
try {
- FullTextFilterMetadataEntityTupleTranslator translator =
-
tupleTranslatorProvider.getFullTextFilterTupleTranslator(true);
-
ITupleReference key =
createTuple(dataverseName.getCanonicalForm(), filterName);
deleteTupleFromIndex(txnId,
MetadataPrimaryIndexes.FULL_TEXT_FILTER_DATASET, key);
} catch (HyracksDataException e) {
@@ -612,7 +614,7 @@ public class MetadataNode implements IMetadataNode {
IModificationOperationCallback modCallback =
createIndexModificationCallback(op, txnCtx, metadataIndex);
IIndexAccessParameters iap = new
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
- txnCtx.setWriteTxn(true);
+ txnCtx.acquireExclusiveWriteLock(metadataModificationLock);
txnCtx.register(metadataIndex.getResourceId(),
StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex,
modCallback,
metadataIndex.isPrimaryIndex());
@@ -640,6 +642,12 @@ public class MetadataNode implements IMetadataNode {
switch (indexOp) {
case INSERT:
case DELETE:
+ if (!txnCtx.hasWAL()) {
+ return new
NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+ metadataIndex.getPrimaryKeyIndexes(), txnCtx,
transactionSubsystem.getLockManager(),
+ transactionSubsystem,
metadataIndex.getResourceId(), metadataStoragePartition,
+ ResourceType.LSM_BTREE, indexOp);
+ }
/*
* Regardless of the index type (primary or secondary index),
secondary index modification
* callback is given. This is still correct since metadata
index operation doesn't require
@@ -650,6 +658,12 @@ public class MetadataNode implements IMetadataNode {
transactionSubsystem, metadataIndex.getResourceId(),
metadataStoragePartition,
ResourceType.LSM_BTREE, indexOp);
case UPSERT:
+ if (!txnCtx.hasWAL()) {
+ return new
NoOpModificationOpCallback(metadataIndex.getDatasetId(),
+ metadataIndex.getPrimaryKeyIndexes(), txnCtx,
transactionSubsystem.getLockManager(),
+ transactionSubsystem,
metadataIndex.getResourceId(), metadataStoragePartition,
+ ResourceType.LSM_BTREE, indexOp);
+ }
return new
UpsertOperationCallback(metadataIndex.getDatasetId(),
metadataIndex.getPrimaryKeyIndexes(),
txnCtx, transactionSubsystem.getLockManager(),
transactionSubsystem,
metadataIndex.getResourceId(),
metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index a6ba0e2916..11bac0d99e 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -71,7 +71,7 @@ public class CheckpointPartitionIndexesTask implements
IReplicaTask {
// Get most recent sequence of existing files to avoid deletion
FileReference indexPath = StoragePathUtil.getIndexPath(ioManager,
ref);
Collection<FileReference> files =
- ioManager.getMatchingFiles(indexPath,
AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ ioManager.list(indexPath,
AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
if (files == null) {
throw HyracksDataException
.create(new IOException(indexPath + " is not a
directory or an IO Error occurred"));
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
new file mode 100644
index 0000000000..ad77939a66
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/NoOpModificationOpCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class NoOpModificationOpCallback extends
AbstractIndexModificationOperationCallback {
+
+ public NoOpModificationOpCallback(DatasetId datasetId, int[]
primaryKeyFields, ITransactionContext txnCtx,
+ ILockManager lockManager, ITransactionSubsystem txnSubsystem, long
resourceId, int resourcePartition,
+ byte resourceType, Operation indexOp) {
+ super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem,
resourceId, resourcePartition,
+ resourceType, indexOp);
+ }
+
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ // no op
+ }
+
+ @Override
+ public void found(ITupleReference before, ITupleReference after) throws
HyracksDataException {
+ // no op
+ }
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 25b961017c..4e71b1c629 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -63,6 +63,7 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
@@ -240,7 +241,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
List<FileReference> roots) throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
for (FileReference root : roots) {
- final Collection<FileReference> files =
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+ final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
try {
for (FileReference file : files) {
final LocalResource localResource =
readLocalResource(file);
@@ -274,7 +275,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public synchronized void deleteInvalidIndexes(Predicate<LocalResource>
filter) throws HyracksDataException {
for (FileReference root : storageRoots) {
- final Collection<FileReference> files =
ioManager.getMatchingFiles(root, METADATA_FILES_FILTER);
+ final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
try {
for (FileReference file : files) {
final LocalResource localResource =
readLocalResource(file);
@@ -452,7 +453,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private List<String> getIndexFiles(FileReference indexDir) throws
HyracksDataException {
final List<String> indexFiles = new ArrayList<>();
- Collection<FileReference> indexFilteredFiles =
ioManager.getMatchingFiles(indexDir, LSM_INDEX_FILES_FILTER);
+ Collection<FileReference> indexFilteredFiles =
ioManager.list(indexDir, LSM_INDEX_FILES_FILTER);
indexFilteredFiles.stream().map(FileReference::getAbsolutePath).forEach(indexFiles::add);
return indexFiles;
}
@@ -492,8 +493,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public synchronized void deleteCorruptedResources() throws
HyracksDataException {
for (FileReference root : storageRoots) {
- final Collection<FileReference> metadataMaskFiles =
- ioManager.getMatchingFiles(root,
METADATA_MASK_FILES_FILTER);
+ final Collection<FileReference> metadataMaskFiles =
ioManager.list(root, METADATA_MASK_FILES_FILTER);
for (FileReference metadataMaskFile : metadataMaskFiles) {
final FileReference resourceFile =
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
ioManager.delete(resourceFile);
@@ -503,7 +503,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
private void deleteIndexMaskedFiles(FileReference index) throws
IOException {
- Collection<FileReference> masks = ioManager.getMatchingFiles(index,
MASK_FILES_FILTER);
+ Collection<FileReference> masks = ioManager.list(index,
MASK_FILES_FILTER);
for (FileReference mask : masks) {
deleteIndexMaskedFiles(index, mask);
// delete the mask itself
@@ -520,7 +520,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
private void deleteIndexInvalidComponents(FileReference index) throws
IOException, ParseException {
- final Collection<FileReference> indexComponentFiles =
ioManager.getMatchingFiles(index, COMPONENT_FILES_FILTER);
+ final Collection<FileReference> indexComponentFiles =
ioManager.list(index, COMPONENT_FILES_FILTER);
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error
occurred");
}
@@ -550,10 +550,10 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
Collection<FileReference> maskedFiles;
if (isComponentMask(mask)) {
final String componentId =
mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
- maskedFiles = ioManager.getMatchingFiles(index, (dir, name) ->
name.startsWith(componentId));
+ maskedFiles = ioManager.list(index, (dir, name) ->
name.startsWith(componentId));
} else {
final String maskedFileName =
mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
- maskedFiles = ioManager.getMatchingFiles(index, (dir, name) ->
name.equals(maskedFileName));
+ maskedFiles = ioManager.list(index, (dir, name) ->
name.equals(maskedFileName));
}
if (maskedFiles != null) {
for (FileReference maskedFile : maskedFiles) {
@@ -643,14 +643,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
}
- public synchronized List<FileReference> getOnDiskPartitions() throws
HyracksDataException {
+ public synchronized List<FileReference> getOnDiskPartitions() {
List<FileReference> onDiskPartitions = new ArrayList<>();
for (FileReference root : storageRoots) {
- Collection<FileReference> partitions = ioManager.list(root, (dir,
name) -> dir != null && dir.isDirectory()
- && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
- if (partitions != null) {
- onDiskPartitions.addAll(partitions);
- }
+ onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir,
name) -> dir != null && dir.isDirectory()
+ &&
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
}
return onDiskPartitions;
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 104f9a78ac..8a1856ced5 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -23,8 +23,10 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.asterix.common.context.ITransactionOperationTracker;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
@@ -42,6 +44,7 @@ public abstract class AbstractTransactionContext implements
ITransactionContext
private final AtomicInteger txnState;
private final AtomicBoolean isWriteTxn;
private volatile boolean isTimeout;
+ private ReentrantLock exclusiveLock;
protected AbstractTransactionContext(TxnId txnId) {
this.txnId = txnId;
@@ -89,6 +92,21 @@ public abstract class AbstractTransactionContext implements
ITransactionContext
return isTimeout;
}
+ @Override
+ public void acquireExclusiveWriteLock(ReentrantLock exclusiveLock) {
+ if (isWriteTxn.get()) {
+ return;
+ }
+ try {
+ exclusiveLock.lockInterruptibly();
+ this.exclusiveLock = exclusiveLock;
+ setWriteTxn(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
+ }
+ }
+
@Override
public void setWriteTxn(boolean isWriteTxn) {
this.isWriteTxn.set(isWriteTxn);
@@ -114,6 +132,9 @@ public abstract class AbstractTransactionContext implements
ITransactionContext
synchronized (txnOpTrackers) {
txnOpTrackers.forEach((resource, opTracker) ->
opTracker.afterTransaction(resource));
}
+ if (exclusiveLock != null) {
+ exclusiveLock.unlock();
+ }
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
new file mode 100644
index 0000000000..cfa39c61c0
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
+
+ public AtomicNoWALTransactionContext(TxnId txnId) {
+ super(txnId);
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ final int txnState = getTxnState();
+ switch (txnState) {
+ case ITransactionManager.ABORTED:
+ deleteUncommittedRecords();
+ break;
+ case ITransactionManager.COMMITTED:
+ ensureDurable();
+ break;
+ default:
+ throw new IllegalStateException("invalid state in txn clean
up: " + getTxnState());
+ }
+ }
+
+ private void deleteUncommittedRecords() {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker =
(PrimaryIndexOperationTracker) opTrackerRef;
+ try {
+ primaryIndexOpTracker.deleteMemoryComponent();
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+
+ private void ensureDurable() {
+ List<FlushOperation> flushes = new ArrayList<>();
+ LogRecord dummyLogRecord = new LogRecord();
+ try {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker =
(PrimaryIndexOperationTracker) opTrackerRef;
+ primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
+ flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+ }
+ LSMIndexUtil.waitFor(flushes);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
+ public boolean hasWAL() {
+ return false;
+ }
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 083c26b795..870a76b1c8 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,7 +18,10 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +41,7 @@ public class AtomicTransactionContext extends
AbstractTransactionContext {
private final Map<Long, ILSMOperationTracker> opTrackers = new
ConcurrentHashMap<>();
private final Map<Long, AtomicInteger> indexPendingOps = new
ConcurrentHashMap<>();
private final Map<Long, IModificationOperationCallback> callbacks = new
ConcurrentHashMap<>();
+ protected final Set<ILSMOperationTracker> modifiedIndexes =
Collections.synchronizedSet(new HashSet<>());
public AtomicTransactionContext(TxnId txnId) {
super(txnId);
@@ -64,6 +68,7 @@ public class AtomicTransactionContext extends
AbstractTransactionContext {
@Override
public void beforeOperation(long resourceId) {
indexPendingOps.get(resourceId).incrementAndGet();
+ modifiedIndexes.add(opTrackers.get(resourceId));
}
@Override
@@ -110,4 +115,9 @@ public class AtomicTransactionContext extends
AbstractTransactionContext {
AtomicTransactionContext that = (AtomicTransactionContext) o;
return this.txnId.equals(that.txnId);
}
+
+ @Override
+ public boolean hasWAL() {
+ return true;
+ }
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index be5874ed48..2e5f6dd415 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -113,4 +113,9 @@ public class EntityLevelTransactionContext extends
AbstractTransactionContext {
EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
return this.txnId.equals(that.txnId);
}
+
+ @Override
+ public boolean hasWAL() {
+ return true;
+ }
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 4a465a4de1..1076086346 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -34,6 +34,8 @@ public class TransactionContextFactory {
switch (atomicityLevel) {
case ATOMIC:
return new AtomicTransactionContext(txnId);
+ case ATOMIC_NO_WAL:
+ return new AtomicNoWALTransactionContext(txnId);
case ENTITY_LEVEL:
return new EntityLevelTransactionContext(txnId);
default:
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index ee659628be..8bbfbfdb85 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -81,9 +81,11 @@ public class TransactionManager implements
ITransactionManager, ILifeCycleCompon
final ITransactionContext txnCtx = getTransactionContext(txnId);
try {
if (txnCtx.isWriteTxn()) {
- LogRecord logRecord = new LogRecord();
- TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord,
true);
- txnSubsystem.getLogManager().log(logRecord);
+ if (txnCtx.hasWAL()) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(txnCtx,
logRecord, true);
+ txnSubsystem.getLogManager().log(logRecord);
+ }
txnCtx.setTxnState(ITransactionManager.COMMITTED);
}
} catch (Exception e) {
@@ -103,13 +105,15 @@ public class TransactionManager implements
ITransactionManager, ILifeCycleCompon
final ITransactionContext txnCtx = getTransactionContext(txnId);
try {
if (txnCtx.isWriteTxn()) {
- if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
- LogRecord logRecord = new LogRecord();
- TransactionUtil.formJobTerminateLogRecord(txnCtx,
logRecord, false);
- txnSubsystem.getLogManager().log(logRecord);
- txnSubsystem.getCheckpointManager().secure(txnId);
+ if (txnCtx.hasWAL()) {
+ if (txnCtx.getFirstLSN() != TERMINAL_LSN) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(txnCtx,
logRecord, false);
+ txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getCheckpointManager().secure(txnId);
+ }
+
txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
}
- txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
txnCtx.setTxnState(ITransactionManager.ABORTED);
}
} catch (HyracksDataException e) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 63226ac7d6..10f16372a7 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -23,7 +23,6 @@ import java.io.FilenameFilter;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -125,6 +124,13 @@ public interface IIOManager extends Closeable {
Set<FileReference> list(FileReference dir) throws HyracksDataException;
+ /**
+ * Lists the files matching {@code filter} recursively starting from
{@code dir}
+ * @param dir
+ * @param filter
+ * @return the matching files
+ * @throws HyracksDataException
+ */
Set<FileReference> list(FileReference dir, FilenameFilter filter) throws
HyracksDataException;
void overwrite(FileReference fileRef, byte[] bytes) throws
ClosedByInterruptException, HyracksDataException;
@@ -136,9 +142,6 @@ public interface IIOManager extends Closeable {
void deleteDirectory(FileReference root) throws HyracksDataException;
- // TODO: Remove and use list
- Collection<FileReference> getMatchingFiles(FileReference root,
FilenameFilter filter) throws HyracksDataException;
-
boolean exists(FileReference fileRef) throws HyracksDataException;
void create(FileReference fileRef) throws HyracksDataException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 7644a306bd..05e65449bc 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -29,8 +29,11 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -194,4 +197,18 @@ public class IoUtil {
public static String getFileNameFromPath(String path) {
return path.substring(path.lastIndexOf('/') + 1);
}
+
+ public static Collection<FileReference> getMatchingChildren(FileReference
root, FilenameFilter filter) {
+ if (!root.getFile().isDirectory()) {
+ throw new IllegalArgumentException("Parameter 'root' is not a
directory: " + root);
+ }
+ Objects.requireNonNull(filter);
+ List<FileReference> files = new ArrayList<>();
+ String[] matchingFiles = root.getFile().list(filter);
+ if (matchingFiles != null) {
+ files.addAll(Arrays.stream(matchingFiles).map(pDir -> new
FileReference(root.getDeviceHandle(), pDir))
+ .collect(Collectors.toList()));
+ }
+ return files;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 19fbff37ec..1733ad47b1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -517,27 +517,13 @@ public class IOManager implements IIOManager {
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
- /*
- * Throws an error if this abstract pathname does not denote a
directory, or if an I/O error occurs.
- * Returns an empty set if the file does not exist, otherwise, returns
the files in the specified directory
- */
Set<FileReference> listedFiles = new HashSet<>();
if (!dir.getFile().exists()) {
return listedFiles;
}
-
- String[] files = dir.getFile().list(filter);
- if (files == null) {
- if (!dir.getFile().canRead()) {
- throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE,
dir);
- } else if (!dir.getFile().isDirectory()) {
- throw
HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
- }
- throw
HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
- }
-
- for (String file : files) {
- listedFiles.add(dir.getChild(file));
+ Collection<File> files =
IoUtil.getMatchingFiles(dir.getFile().toPath(), filter);
+ for (File file : files) {
+ listedFiles.add(resolveAbsolutePath(file.getAbsolutePath()));
}
return listedFiles;
}
@@ -578,23 +564,6 @@ public class IOManager implements IIOManager {
}
}
- @Override
- public Collection<FileReference> getMatchingFiles(FileReference root,
FilenameFilter filter)
- throws HyracksDataException {
- File rootFile = root.getFile();
- if (!rootFile.exists() || !rootFile.isDirectory()) {
- return Collections.emptyList();
- }
-
- Collection<File> files = IoUtil.getMatchingFiles(rootFile.toPath(),
filter);
- Set<FileReference> fileReferences = new HashSet<>();
- for (File file : files) {
- fileReferences.add(resolveAbsolutePath(file.getAbsolutePath()));
- }
-
- return fileReferences;
- }
-
@Override
public boolean exists(FileReference fileRef) throws HyracksDataException {
return fileRef.getFile().exists();
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 45bfed16dd..1c0d7b49d5 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.common.IIndex;
@@ -101,7 +100,7 @@ public class IndexBuilder implements IIndexBuilder {
LOGGER.warn(
"Deleting {} on index create. The index is not
registered but the file exists in the filesystem",
resolvedResourceRef);
- IoUtil.delete(resolvedResourceRef);
+ ctx.getIoManager().delete(resolvedResourceRef);
}
index = resource.createInstance(ctx);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 6a4f10dff0..1bceea3852 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -42,8 +42,8 @@ public abstract class AbstractBufferedFileIOManager {
protected final BufferCache bufferCache;
protected final IPageReplacementStrategy pageReplacementStrategy;
+ protected final IOManager ioManager;
private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
- private final IOManager ioManager;
private IFileHandle fileHandle;
private volatile boolean hasOpen;
@@ -193,7 +193,7 @@ public abstract class AbstractBufferedFileIOManager {
}
}
- public static void deleteFile(FileReference fileRef) throws
HyracksDataException {
+ public static void deleteFile(FileReference fileRef, IIOManager ioManager)
throws HyracksDataException {
HyracksDataException savedEx = null;
/*
@@ -206,7 +206,7 @@ public abstract class AbstractBufferedFileIOManager {
final CompressedFileReference cFileRef =
(CompressedFileReference) fileRef;
final FileReference lafFileRef =
cFileRef.getLAFFileReference();
if (lafFileRef.getFile().exists()) {
- IoUtil.delete(lafFileRef);
+ ioManager.delete(lafFileRef);
}
}
} catch (HyracksDataException e) {
@@ -214,7 +214,7 @@ public abstract class AbstractBufferedFileIOManager {
}
try {
- IoUtil.delete(fileRef);
+ ioManager.delete(fileRef);
} catch (HyracksDataException e) {
if (savedEx != null) {
savedEx.addSuppressed(e);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 70500e5845..10284f2c4d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.api.util.IoUtil;
import
org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
@@ -761,7 +760,7 @@ public class BufferCache implements IBufferCacheInternal,
ILifeCycleComponent, I
} catch (Exception e) {
// If file registration failed for any reason, we need to undo the
file creation
try {
- IoUtil.delete(fileRef);
+ ioManager.delete(fileRef);
} catch (Exception deleteException) {
e.addSuppressed(deleteException);
}
@@ -960,7 +959,7 @@ public class BufferCache implements IBufferCacheInternal,
ILifeCycleComponent, I
if (mapped) {
deleteFile(fileId);
} else {
- BufferedFileHandle.deleteFile(fileRef);
+ BufferedFileHandle.deleteFile(fileRef, ioManager);
}
}
@@ -991,7 +990,7 @@ public class BufferCache implements IBufferCacheInternal,
ILifeCycleComponent, I
fInfo.markAsDeleted();
}
} finally {
- BufferedFileHandle.deleteFile(fileRef);
+ BufferedFileHandle.deleteFile(fileRef, ioManager);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
index 7d0cc62878..d78ac244f9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
@@ -64,6 +65,7 @@ public class CompressedFileManager {
private final IBufferCache bufferCache;
private final ICompressorDecompressor compressorDecompressor;
private final CompressedFileReference fileRef;
+ private final IIOManager ioManager;
private int fileId;
private State state;
@@ -71,12 +73,13 @@ public class CompressedFileManager {
private LAFWriter lafWriter;
- public CompressedFileManager(IBufferCache bufferCache,
CompressedFileReference fileRef) {
+ public CompressedFileManager(IBufferCache bufferCache,
CompressedFileReference fileRef, IIOManager ioManager) {
state = State.CLOSED;
totalNumOfPages = 0;
this.bufferCache = bufferCache;
this.fileRef = fileRef;
this.compressorDecompressor = fileRef.getCompressorDecompressor();
+ this.ioManager = ioManager;
}
/* ************************
@@ -99,7 +102,7 @@ public class CompressedFileManager {
ensureState(CLOSED);
boolean open = false;
- if (fileRef.getLAFFileReference().getFile().exists()) {
+ if (ioManager.exists(fileRef.getLAFFileReference())) {
fileId = bufferCache.openFile(fileRef.getLAFFileReference());
open = true;
} else {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index b6178c5f12..01811c7945 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -157,7 +157,7 @@ public class CompressedBufferedFileHandle extends
BufferedFileHandle {
@Override
public void open(FileReference fileRef) throws HyracksDataException {
final CompressedFileReference cFileRef = (CompressedFileReference)
fileRef;
- compressedFileManager = new CompressedFileManager(bufferCache,
cFileRef);
+ compressedFileManager = new CompressedFileManager(bufferCache,
cFileRef, ioManager);
compressedFileManager.open();
super.open(fileRef);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
index ddb57171a1..de8bdc8c75 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/AbstractIndexLifecycleTest.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.storage.am.common;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.util.Log4j2Monitor;
import org.apache.logging.log4j.Level;
@@ -97,7 +96,6 @@ public abstract class AbstractIndexLifecycleTest {
index.destroy();
Assert.assertFalse(persistentStateExists());
index.destroy();
- Assert.assertTrue(Log4j2Monitor.count(IoUtil.FILE_NOT_FOUND_MSG) > 0);
Assert.assertFalse(persistentStateExists());
}