This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit cc7d89ccbde59c209fdbe8a819c95fa567ea9224
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sun Jul 9 08:28:45 2023 -0700

    [ASTERIXDB-3220][REPL] Allow waiting for IO on specific dataset partition
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Add API to wait for IO on a specific dataset partition.
    - When waiting for a partition replica IO ops to finish, only wait
      for the replica partition rather than all partitions.
    
    Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../common/api/IDatasetLifecycleManager.java       |  6 +++--
 .../common/context/BaseOperationTracker.java       |  8 +++---
 .../apache/asterix/common/context/DatasetInfo.java | 31 ++++++++++++++++++++--
 .../common/context/DatasetLifecycleManager.java    |  4 +--
 .../context/PrimaryIndexOperationTracker.java      |  4 +--
 .../ioopcallbacks/LSMIOOperationCallback.java      |  6 +++--
 .../ioopcallbacks/LSMIOOperationCallbackTest.java  |  7 +++++
 .../replication/sync/ReplicaSynchronizer.java      |  2 +-
 .../SecondaryIndexOperationTrackerFactory.java     |  4 ++-
 9 files changed, 56 insertions(+), 16 deletions(-)

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047d86..c7eee2163e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -161,12 +161,14 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
     void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate 
partitions) throws HyracksDataException;
 
     /**
-     * Waits for all ongoing IO operations on all open datasets that are 
matching {@code replicationStrategy}.
+     * Waits for all ongoing IO operations on all open datasets that are 
matching {@code replicationStrategy} and
+     * {@code partition}.
      *
      * @param replicationStrategy
+     * @param partition
      * @throws HyracksDataException
      */
-    void waitForIO(IReplicationStrategy replicationStrategy) throws 
HyracksDataException;
+    void waitForIO(IReplicationStrategy replicationStrategy, int partition) 
throws HyracksDataException;
 
     /**
      * @return the current datasets io stats
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb473c..7d3dba4ead 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@ public class BaseOperationTracker implements 
ITransactionOperationTracker {
     private static final Logger LOGGER = LogManager.getLogger();
     protected final int datasetID;
     protected final DatasetInfo dsInfo;
+    protected final int partition;
 
-    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int 
partition) {
         this.datasetID = datasetID;
         this.dsInfo = dsInfo;
+        this.partition = partition;
     }
 
     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, 
ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws 
HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.declareActiveIOOperation(REPLICATE);
+            dsInfo.declareActiveIOOperation(REPLICATE, partition);
         }
     }
 
@@ -59,7 +61,7 @@ public class BaseOperationTracker implements 
ITransactionOperationTracker {
     public void completeOperation(ILSMIndex index, LSMOperationType opType, 
ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws 
HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.undeclareActiveIOOperation(REPLICATE);
+            dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
         }
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be927..87a3c2f02f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private static final Logger LOGGER = LogManager.getLogger();
     // partition -> index
     private final Map<Integer, Set<IndexInfo>> partitionIndexes;
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
+    private final Int2IntMap partitionPendingIO;
     private final int datasetID;
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
     public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
+        this.partitionPendingIO = new Int2IntOpenHashMap();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
@@ -74,7 +79,8 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
         setLastAccess(System.currentTimeMillis());
     }
 
-    public synchronized void 
declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void 
declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int 
partition) {
+        partitionPendingIO.put(partition, 
partitionPendingIO.getOrDefault(partition, 0) + 1);
         numActiveIOOps++;
         switch (opType) {
             case FLUSH:
@@ -91,7 +97,8 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
         }
     }
 
-    public synchronized void 
undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void 
undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int 
partition) {
+        partitionPendingIO.put(partition, 
partitionPendingIO.getOrDefault(partition, 0) - 1);
         numActiveIOOps--;
         switch (opType) {
             case FLUSH:
@@ -253,6 +260,26 @@ public class DatasetInfo extends Info implements 
Comparable<DatasetInfo> {
         }
     }
 
+    public void waitForIO(int partition) throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
+            }
+            if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+                LOGGER.error("number of IO operations cannot be negative for 
dataset {}, partition {}", this,
+                        partition);
+                throw new IllegalStateException(
+                        "Number of IO operations cannot be negative: " + this 
+ ", partition " + partition);
+            }
+        }
+    }
+
     public synchronized int getPendingFlushes() {
         return pendingFlushes;
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc4d0..4fc9dd6728 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -552,10 +552,10 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public void waitForIO(IReplicationStrategy replicationStrategy) throws 
HyracksDataException {
+    public void waitForIO(IReplicationStrategy replicationStrategy, int 
partition) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && 
replicationStrategy.isMatch(dsr.getDatasetID())) {
-                dsr.getDatasetInfo().waitForIO();
+                dsr.getDatasetInfo().waitForIO(partition);
             }
         }
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02af7..a4ad7cf232 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,6 @@ import org.apache.logging.log4j.Logger;
 @NotThreadSafe
 public class PrimaryIndexOperationTracker extends BaseOperationTracker 
implements IoOperationCompleteListener {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
@@ -71,8 +70,7 @@ public class PrimaryIndexOperationTracker extends 
BaseOperationTracker implement
 
     public PrimaryIndexOperationTracker(int datasetID, int partition, 
ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, dsInfo);
-        this.partition = partition;
+        super(datasetID, dsInfo, partition);
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51960..f56e5c0a16 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -74,6 +74,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     protected final DatasetInfo dsInfo;
     protected final ILSMIndex lsmIndex;
+    private final int partition;
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
@@ -84,6 +85,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.partition = 
ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
         componentIds.add(componentId);
     }
 
@@ -259,7 +261,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
 
     @Override
     public synchronized void scheduled(ILSMIOOperation operation) throws 
HyracksDataException {
-        dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), 
partition);
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
             FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +284,7 @@ public class LSMIOOperationCallback implements 
ILSMIOOperationCallback {
                         pendingFlushes == 0 ? 
firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
             }
         }
-        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), 
partition);
     }
 
     public synchronized boolean hasPendingFlush() {
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index db0911bc17..33d513f7e4 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -77,6 +77,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
         String indexId = "mockIndexId";
         
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+        
Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         DatasetInfo dsInfo = new DatasetInfo(101, null);
         LSMComponentIdGenerator idGenerator = new 
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, 
mockIndex, idGenerator.getId(),
@@ -140,6 +141,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
         ILSMComponentIdGenerator idGenerator = new 
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+        
Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, 
mockIndex, idGenerator.getId(),
@@ -161,6 +163,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
         ILSMComponentIdGenerator idGenerator = new 
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+        
Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, 
mockIndex, idGenerator.getId(),
@@ -221,4 +224,8 @@ public class LSMIOOperationCallbackTest extends TestCase {
         
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
         return indexCheckpointManagerProvider;
     }
+
+    private static String getIndexPath() {
+        return "storage/partition_0/dataverse/dataset/0/index";
+    }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01245..68ccd54e50 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@ public class ReplicaSynchronizer {
     private void waitForReplicatedDatasetsIO() throws HyracksDataException {
         // wait for IO operations to ensure replicated datasets files won't 
change during replica sync
         final IReplicationStrategy replStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, 
replica.getIdentifier().getPartition());
     }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae39fc..827b7131fc 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@ public class SecondaryIndexOperationTrackerFactory implements 
ILSMOperationTrack
     public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, 
IResource resource) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) 
ctx.getApplicationContext()).getDatasetLifecycleManager();
-        return new BaseOperationTracker(datasetId, 
dslcManager.getDatasetInfo(datasetId));
+        int partition = 
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        return new BaseOperationTracker(datasetId, 
dslcManager.getDatasetInfo(datasetId), partition);
     }
 
     @Override

Reply via email to