This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new db7853c9f3 [ASTERIXDB-3417][STO] Block all I/Os while the clean 
operation is running
db7853c9f3 is described below

commit db7853c9f3462021df5ba82f89bfdf9086cf935a
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed Jun 5 10:25:54 2024 -0700

    [ASTERIXDB-3417][STO] Block all I/Os while the clean operation is running
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    The cleanup operation currently waits for all I/O before it
    proceeds to clean all indexes in a given partition. However,
    this won't prevent any scheduled LSM operation from being
    triggered. This could interfere with an ongoing cleanup operation
    
    Change-Id: Ia168bcb84788a9a6b8444c3bf71c12d62299e5e6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18341
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../common/api/IDatasetLifecycleManager.java       | 20 ++++--
 .../asterix/common/api/IIOBlockingOperation.java   | 53 +++++++++++++++
 .../apache/asterix/common/context/DatasetInfo.java | 10 ++-
 .../common/context/DatasetLifecycleManager.java    | 29 +++++++-
 .../common/context/NoOpBlockingIOOperation.java    | 46 +++++++++++++
 .../resource/CleanupBlockingIOOperation.java       | 77 ++++++++++++++++++++++
 .../PersistentLocalResourceRepository.java         | 30 +++++----
 7 files changed, 247 insertions(+), 18 deletions(-)

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 070fe6533b..d38b1b7b12 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -87,10 +87,8 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
     DatasetInfo getDatasetInfo(int datasetID);
 
     /**
-     * @param datasetId
-     *            the dataset id to be flushed.
-     * @param asyncFlush
-     *            a flag indicating whether to wait for the flush to complete 
or not.
+     * @param datasetId  the dataset id to be flushed.
+     * @param asyncFlush a flag indicating whether to wait for the flush to 
complete or not.
      * @throws HyracksDataException
      */
     void flushDataset(int datasetId, boolean asyncFlush) throws 
HyracksDataException;
@@ -171,6 +169,18 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
      */
     void waitForIO(IReplicationStrategy replicationStrategy, int partition) 
throws HyracksDataException;
 
+    /**
+     * Waits for all ongoing IO operations on all open datasets and atomically 
performs the provided {@code operation}
+     * on each opened index before allowing any I/Os to go through.
+     *
+     * @param replicationStrategy replication strategy
+     * @param partition           partition to perform the required operation 
against
+     * @param operation           operation to perform
+     */
+
+    void waitForIOAndPerform(IReplicationStrategy replicationStrategy, int 
partition, IIOBlockingOperation operation)
+            throws HyracksDataException;
+
     /**
      * @return the current datasets io stats
      */
@@ -178,6 +188,7 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
 
     /**
      * Closes {@code resourcePath} if open
+     *
      * @param resourcePath
      * @throws HyracksDataException
      */
@@ -185,6 +196,7 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
 
     /**
      * Removes all memory references of {@code partition}
+     *
      * @param partitionId
      */
     void closePartition(int partitionId);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
new file mode 100644
index 0000000000..e2f4c91092
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IIOBlockingOperation.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An {@link DatasetLifecycleManager#waitForIOAndPerform(IReplicationStrategy, 
int, IIOBlockingOperation)} operation,
+ * which can be executed while the I/O is blocked for each open {@link 
DatasetInfo}
+ */
+public interface IIOBlockingOperation {
+
+    /**
+     * Prepares for calling {@link #perform(Collection)} on the provided 
{@code partition}.
+     */
+    void beforeOperation() throws HyracksDataException;
+
+    /**
+     * Performs the required operations. The operation will be performed in a 
{@code synchronize} block on
+     * {@link DatasetInfo}, which would block all operations on the dataset
+     *
+     * @param indexes to perform the operation against
+     * @see DatasetInfo#waitForIOAndPerform(int, IIOBlockingOperation)
+     */
+    void perform(Collection<IndexInfo> indexes) throws HyracksDataException;
+
+    /**
+     * After calling {@link #perform(Collection)}, this should be invoked to 
perform any necessary clean up
+     */
+    void afterOperation() throws HyracksDataException;
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 87a3c2f02f..9064db5bbf 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IIOBlockingOperation;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
@@ -260,7 +261,7 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
         }
     }
 
-    public void waitForIO(int partition) throws HyracksDataException {
+    public void waitForIOAndPerform(int partition, IIOBlockingOperation 
operation) throws HyracksDataException {
         logManager.log(waitLog);
         synchronized (this) {
             while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
@@ -271,6 +272,13 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
                     throw HyracksDataException.create(e);
                 }
             }
+
+            Set<IndexInfo> indexes = partitionIndexes.get(partition);
+            if (indexes != null) {
+                // Perform the required operation
+                operation.perform(indexes);
+            }
+
             if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
                 LOGGER.error("number of IO operations cannot be negative for 
dataset {}, partition {}", this,
                         partition);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 07801a90a8..3e0fbd43e9 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -31,6 +31,7 @@ import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IIOBlockingOperation;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
@@ -571,11 +572,37 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     public void waitForIO(IReplicationStrategy replicationStrategy, int 
partition) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && 
replicationStrategy.isMatch(dsr.getDatasetID())) {
-                dsr.getDatasetInfo().waitForIO(partition);
+                // Do a simple wait without any operation
+                dsr.getDatasetInfo().waitForIOAndPerform(partition, 
NoOpBlockingIOOperation.INSTANCE);
             }
         }
     }
 
+    /**
+     * Waits for all ongoing IO operations on all open datasets and atomically 
performs the provided {@code operation}
+     * on each opened index before allowing any I/Os to go through.
+     *
+     * @param replicationStrategy replication strategy
+     * @param partition           partition to perform the required operation 
against
+     * @param operation           operation to perform
+     */
+    @Override
+    public void waitForIOAndPerform(IReplicationStrategy replicationStrategy, 
int partition,
+            IIOBlockingOperation operation) throws HyracksDataException {
+        // Signal the operation will be performed
+        operation.beforeOperation();
+
+        for (DatasetResource dsr : datasets.values()) {
+            if (dsr.isOpen() && 
replicationStrategy.isMatch(dsr.getDatasetID())) {
+                // Wait for all I/Os and then perform the requested operation
+                dsr.getDatasetInfo().waitForIOAndPerform(partition, operation);
+            }
+        }
+
+        // Signal the operation has been performed
+        operation.afterOperation();
+    }
+
     @Override
     public StorageIOStats getDatasetsIOStats() {
         StorageIOStats stats = new StorageIOStats();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
new file mode 100644
index 0000000000..ad42f7a39a
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/NoOpBlockingIOOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.api.IIOBlockingOperation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class NoOpBlockingIOOperation implements IIOBlockingOperation {
+    public static final NoOpBlockingIOOperation INSTANCE = new 
NoOpBlockingIOOperation();
+
+    private NoOpBlockingIOOperation() {
+    }
+
+    @Override
+    public void beforeOperation() throws HyracksDataException {
+        // NoOp
+    }
+
+    @Override
+    public void perform(Collection<IndexInfo> indexes) throws 
HyracksDataException {
+        // NoOp
+    }
+
+    @Override
+    public void afterOperation() throws HyracksDataException {
+        // NoOp
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
new file mode 100644
index 0000000000..0790a22b52
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/CleanupBlockingIOOperation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IIOBlockingOperation;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+class CleanupBlockingIOOperation implements IIOBlockingOperation {
+    private final int partition;
+    private final PersistentLocalResourceRepository localRepository;
+    private final IIOManager ioManager;
+    private final Set<FileReference> cleanedIndexes;
+
+    public CleanupBlockingIOOperation(int partition, 
PersistentLocalResourceRepository localRepository,
+            IIOManager ioManager) {
+        this.partition = partition;
+        this.localRepository = localRepository;
+        this.ioManager = ioManager;
+        cleanedIndexes = new HashSet<>();
+    }
+
+    @Override
+    public void beforeOperation() throws HyracksDataException {
+        // NoOp
+    }
+
+    /**
+     * Clean all active indexes while the DatasetInfo is synchronized
+     *
+     * @param indexes active indexes to clean
+     */
+    @Override
+    public void perform(Collection<IndexInfo> indexes) throws 
HyracksDataException {
+        for (IndexInfo indexInfo : indexes) {
+            FileReference index = 
ioManager.resolve(indexInfo.getLocalResource().getPath());
+            localRepository.cleanupIndex(index);
+            cleanedIndexes.add(index);
+        }
+    }
+
+    /**
+     * Clean all inactive indexes while the DatasetLifeCycleManager is 
synchronized
+     */
+    @Override
+    public void afterOperation() throws HyracksDataException {
+        Set<FileReference> indexes = 
localRepository.getPartitionIndexes(partition);
+        for (FileReference index : indexes) {
+            if (!cleanedIndexes.contains(index)) {
+                localRepository.cleanupIndex(index);
+            }
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 39a88413de..d0b4aa1beb 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -561,24 +561,30 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public void cleanup(int partition) throws HyracksDataException {
         beforeReadAccess();
         try {
-            
datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, 
partition);
-            final Set<FileReference> partitionIndexes = 
getPartitionIndexes(partition);
-            try {
-                for (FileReference index : partitionIndexes) {
-                    deleteIndexMaskedFiles(index);
-                    if (isValidIndex(index)) {
-                        deleteIndexInvalidComponents(index);
-                    }
-                }
-            } catch (IOException | ParseException e) {
-                throw HyracksDataException.create(e);
-            }
+            CleanupBlockingIOOperation cleanupOp = new 
CleanupBlockingIOOperation(partition, this, ioManager);
+            
datasetLifecycleManager.waitForIOAndPerform(AllDatasetsReplicationStrategy.INSTANCE,
 partition, cleanupOp);
         } finally {
             clearResourcesCache();
             afterReadAccess();
         }
     }
 
+    /**
+     * This will be invoked by {@link CleanupBlockingIOOperation}
+     *
+     * @param index to clean
+     */
+    void cleanupIndex(FileReference index) throws HyracksDataException {
+        try {
+            deleteIndexMaskedFiles(index);
+            if (isValidIndex(index)) {
+                deleteIndexInvalidComponents(index);
+            }
+        } catch (IOException | ParseException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     public List<ResourceStorageStats> getStorageStats() throws 
HyracksDataException {
         beforeReadAccess();
         try {

Reply via email to