This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa6c089bd [ASTERIXDB-3417][STO] Block all I/Os while the clean
operation is running
5aa6c089bd is described below
commit 5aa6c089bd968ea2465596c1c56c898d4c6d4656
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue Jun 4 14:26:05 2024 -0700
[ASTERIXDB-3417][STO] Block all I/Os while the clean operation is running
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
The cleanup operation currently waits for all I/O before it
proceeds to clean all indexes in a given partition. However,
this won't prevent any scheduled LSM operation from being
triggered. This could interfere with an ongoing cleanup operation
Change-Id: I40e8b3769e4f59d6e8e21eeb3875df56430aee0d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18338
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../common/api/IDatasetLifecycleManager.java | 22 +++++--
.../asterix/common/api/IIOBlockingOperation.java | 53 +++++++++++++++
.../apache/asterix/common/context/DatasetInfo.java | 10 ++-
.../common/context/DatasetLifecycleManager.java | 31 ++++++++-
.../common/context/NoOpBlockingIOOperation.java | 46 +++++++++++++
.../resource/CleanupBlockingIOOperation.java | 77 ++++++++++++++++++++++
.../PersistentLocalResourceRepository.java | 30 +++++----
7 files changed, 251 insertions(+), 18 deletions(-)
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 070fe6533b..091712b473 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -87,10 +87,8 @@ public interface IDatasetLifecycleManager extends
IResourceLifecycleManager<IInd
DatasetInfo getDatasetInfo(int datasetID);
/**
- * @param datasetId
- * the dataset id to be flushed.
- * @param asyncFlush
- * a flag indicating whether to wait for the flush to complete
or not.
+ * @param datasetId the dataset id to be flushed.
+ * @param asyncFlush a flag indicating whether to wait for the flush to
complete or not.
* @throws HyracksDataException
*/
void flushDataset(int datasetId, boolean asyncFlush) throws
HyracksDataException;
@@ -171,6 +169,20 @@ public interface IDatasetLifecycleManager extends
IResourceLifecycleManager<IInd
*/
void waitForIO(IReplicationStrategy replicationStrategy, int partition)
throws HyracksDataException;
+ /**
+ * Waits for all ongoing IO operations on all open datasets and atomically
performs the provided {@code operation}
+ * on each opened index before allowing any I/Os to go through.
+ * <p>
+ * <b>NOTE: This maybe a synchronized call</b>
+ *
+ * @param replicationStrategy replication strategy
+ * @param partition partition to perform the required operation
against
+ * @param operation operation to perform
+ */
+
+ void waitForIOAndPerform(IReplicationStrategy replicationStrategy, int
partition, IIOBlockingOperation operation)
+ throws HyracksDataException;
+
/**
* @return the current datasets io stats
*/
@@ -178,6 +190,7 @@ public interface IDatasetLifecycleManager extends
IResourceLifecycleManager<IInd
/**
* Closes {@code resourcePath} if open
+ *
* @param resourcePath
* @throws HyracksDataException
*/
@@ -185,6 +198,7 @@ public interface IDatasetLifecycleManager extends
IResourceLifecycleManager<IInd
/**
* Removes all memory references of {@code partition}
+ *
* @param partitionId
*/
void closePartition(int partitionId);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
new file mode 100644
index 0000000000..e2f4c91092
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An {@link DatasetLifecycleManager#waitForIOAndPerform(IReplicationStrategy,
int, IIOBlockingOperation)} operation,
+ * which can be executed while the I/O is blocked for each open {@link
DatasetInfo}
+ */
+public interface IIOBlockingOperation {
+
+ /**
+ * Prepares for calling {@link #perform(Collection)} on the provided
{@code partition}.
+ */
+ void beforeOperation() throws HyracksDataException;
+
+ /**
+ * Performs the required operations. The operation will be performed in a
{@code synchronize} block on
+ * {@link DatasetInfo}, which would block all operations on the dataset
+ *
+ * @param indexes to perform the operation against
+ * @see DatasetInfo#waitForIOAndPerform(int, IIOBlockingOperation)
+ */
+ void perform(Collection<IndexInfo> indexes) throws HyracksDataException;
+
+ /**
+ * After calling {@link #perform(Collection)}, this should be invoked to
perform any necessary clean up
+ */
+ void afterOperation() throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 87a3c2f02f..9064db5bbf 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IIOBlockingOperation;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
@@ -260,7 +261,7 @@ public class DatasetInfo extends Info implements
Comparable<DatasetInfo> {
}
}
- public void waitForIO(int partition) throws HyracksDataException {
+ public void waitForIOAndPerform(int partition, IIOBlockingOperation
operation) throws HyracksDataException {
logManager.log(waitLog);
synchronized (this) {
while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
@@ -271,6 +272,13 @@ public class DatasetInfo extends Info implements
Comparable<DatasetInfo> {
throw HyracksDataException.create(e);
}
}
+
+ Set<IndexInfo> indexes = partitionIndexes.get(partition);
+ if (indexes != null) {
+ // Perform the required operation
+ operation.perform(indexes);
+ }
+
if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
LOGGER.error("number of IO operations cannot be negative for
dataset {}, partition {}", this,
partition);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 07801a90a8..e8ead2bcbe 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -31,6 +31,7 @@ import java.util.function.IntPredicate;
import java.util.function.Predicate;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IIOBlockingOperation;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
@@ -571,11 +572,39 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
public void waitForIO(IReplicationStrategy replicationStrategy, int
partition) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.isOpen() &&
replicationStrategy.isMatch(dsr.getDatasetID())) {
- dsr.getDatasetInfo().waitForIO(partition);
+ // Do a simple wait without any operation
+ dsr.getDatasetInfo().waitForIOAndPerform(partition,
NoOpBlockingIOOperation.INSTANCE);
}
}
}
+ /**
+ * Waits for all ongoing IO operations on all open datasets and atomically
performs the provided {@code operation}
+ * on each opened index before allowing any I/Os to go through.
+ * <p>
+ * <b>NOTE: this is a synchronized call to prevent activating new indexes
(i.e., modifying {@link #datasets})</b>
+ *
+ * @param replicationStrategy replication strategy
+ * @param partition partition to perform the required operation
against
+ * @param operation operation to perform
+ */
+ @Override
+ public synchronized void waitForIOAndPerform(IReplicationStrategy
replicationStrategy, int partition,
+ IIOBlockingOperation operation) throws HyracksDataException {
+ // Signal the operation will be performed
+ operation.beforeOperation();
+
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.isOpen() &&
replicationStrategy.isMatch(dsr.getDatasetID())) {
+ // Wait for all I/Os and then perform the requested operation
+ dsr.getDatasetInfo().waitForIOAndPerform(partition, operation);
+ }
+ }
+
+ // Signal the operation has been performed
+ operation.afterOperation();
+ }
+
@Override
public StorageIOStats getDatasetsIOStats() {
StorageIOStats stats = new StorageIOStats();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
new file mode 100644
index 0000000000..ad42f7a39a
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.api.IIOBlockingOperation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class NoOpBlockingIOOperation implements IIOBlockingOperation {
+ public static final NoOpBlockingIOOperation INSTANCE = new
NoOpBlockingIOOperation();
+
+ private NoOpBlockingIOOperation() {
+ }
+
+ @Override
+ public void beforeOperation() throws HyracksDataException {
+ // NoOp
+ }
+
+ @Override
+ public void perform(Collection<IndexInfo> indexes) throws
HyracksDataException {
+ // NoOp
+ }
+
+ @Override
+ public void afterOperation() throws HyracksDataException {
+ // NoOp
+ }
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
new file mode 100644
index 0000000000..0790a22b52
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IIOBlockingOperation;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+class CleanupBlockingIOOperation implements IIOBlockingOperation {
+ private final int partition;
+ private final PersistentLocalResourceRepository localRepository;
+ private final IIOManager ioManager;
+ private final Set<FileReference> cleanedIndexes;
+
+ public CleanupBlockingIOOperation(int partition,
PersistentLocalResourceRepository localRepository,
+ IIOManager ioManager) {
+ this.partition = partition;
+ this.localRepository = localRepository;
+ this.ioManager = ioManager;
+ cleanedIndexes = new HashSet<>();
+ }
+
+ @Override
+ public void beforeOperation() throws HyracksDataException {
+ // NoOp
+ }
+
+ /**
+ * Clean all active indexes while the DatasetInfo is synchronized
+ *
+ * @param indexes active indexes to clean
+ */
+ @Override
+ public void perform(Collection<IndexInfo> indexes) throws
HyracksDataException {
+ for (IndexInfo indexInfo : indexes) {
+ FileReference index =
ioManager.resolve(indexInfo.getLocalResource().getPath());
+ localRepository.cleanupIndex(index);
+ cleanedIndexes.add(index);
+ }
+ }
+
+ /**
+ * Clean all inactive indexes while the DatasetLifeCycleManager is
synchronized
+ */
+ @Override
+ public void afterOperation() throws HyracksDataException {
+ Set<FileReference> indexes =
localRepository.getPartitionIndexes(partition);
+ for (FileReference index : indexes) {
+ if (!cleanedIndexes.contains(index)) {
+ localRepository.cleanupIndex(index);
+ }
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 39a88413de..d0b4aa1beb 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -561,24 +561,30 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public void cleanup(int partition) throws HyracksDataException {
beforeReadAccess();
try {
-
datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE,
partition);
- final Set<FileReference> partitionIndexes =
getPartitionIndexes(partition);
- try {
- for (FileReference index : partitionIndexes) {
- deleteIndexMaskedFiles(index);
- if (isValidIndex(index)) {
- deleteIndexInvalidComponents(index);
- }
- }
- } catch (IOException | ParseException e) {
- throw HyracksDataException.create(e);
- }
+ CleanupBlockingIOOperation cleanupOp = new
CleanupBlockingIOOperation(partition, this, ioManager);
+
datasetLifecycleManager.waitForIOAndPerform(AllDatasetsReplicationStrategy.INSTANCE,
partition, cleanupOp);
} finally {
clearResourcesCache();
afterReadAccess();
}
}
+ /**
+ * This will be invoked by {@link CleanupBlockingIOOperation}
+ *
+ * @param index to clean
+ */
+ void cleanupIndex(FileReference index) throws HyracksDataException {
+ try {
+ deleteIndexMaskedFiles(index);
+ if (isValidIndex(index)) {
+ deleteIndexInvalidComponents(index);
+ }
+ } catch (IOException | ParseException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
public List<ResourceStorageStats> getStorageStats() throws
HyracksDataException {
beforeReadAccess();
try {